From 5c138d54b8f0789cc94e70459f86cb58ef9c2ab9 Mon Sep 17 00:00:00 2001 From: Chili Palmer Date: Sat, 25 Apr 2026 19:24:51 +0200 Subject: [PATCH] fix: back to at least part-parallism, except sqlite --- lib/bds/desktop/shell_commands.ex | 7 +- lib/bds/embeddings.ex | 60 ++++++------ lib/bds/media.ex | 118 +++++++++++++++-------- lib/bds/posts.ex | 49 ++++++++-- lib/bds/rebuild.ex | 16 +++ lib/bds/search.ex | 40 ++++---- test/bds/desktop/shell_commands_test.exs | 14 ++- test/bds/posts_test.exs | 74 ++++++++++++++ 8 files changed, 278 insertions(+), 100 deletions(-) create mode 100644 lib/bds/rebuild.ex diff --git a/lib/bds/desktop/shell_commands.ex b/lib/bds/desktop/shell_commands.ex index 60c8883..aa12845 100644 --- a/lib/bds/desktop/shell_commands.ex +++ b/lib/bds/desktop/shell_commands.ex @@ -236,7 +236,12 @@ defmodule BDS.Desktop.ShellCommands do %{ name: "Rebuild Posts From Files", work: fn report -> - {:ok, posts} = Maintenance.rebuild_from_filesystem(project.id, "post", on_progress: report) + {:ok, posts} = + Maintenance.rebuild_from_filesystem(project.id, "post", + on_progress: report, + rebuild_embeddings: false + ) + report.(1.0, "Post rebuild complete") %{project_id: project.id, counts: %{posts: length(posts)}} end diff --git a/lib/bds/embeddings.ex b/lib/bds/embeddings.ex index c1375f6..ddcf36b 100644 --- a/lib/bds/embeddings.ex +++ b/lib/bds/embeddings.ex @@ -39,7 +39,11 @@ defmodule BDS.Embeddings do end def sync_post(%Post{} = post) do - sync_post(post, refresh_index: true) + if enabled_for_project?(post.project_id) do + sync_post_if_enabled(post, refresh_index: true) + else + :ok + end end def sync_post(post_id) when is_binary(post_id) do @@ -61,7 +65,7 @@ defmodule BDS.Embeddings do where: key.project_id == ^project_id and key.post_id not in ^post_ids ) - Enum.each(posts, &sync_post(&1, refresh_index: false)) + Enum.each(posts, &sync_post_if_enabled(&1, refresh_index: false)) :ok = rebuild_snapshot(project_id) {:ok, post_ids} else @@ -109,38 +113,34 @@ defmodule BDS.Embeddings do end end - defp sync_post(%Post{} = post, opts) do - if enabled_for_project?(post.project_id) do - body = resolve_post_body(post) - raw_text = compose_embedding_source(post.title, body) - content_hash = hash_text(raw_text) + defp sync_post_if_enabled(%Post{} = post, opts) do + body = resolve_post_body(post) + raw_text = compose_embedding_source(post.title, body) + content_hash = hash_text(raw_text) - case Repo.get_by(Key, post_id: post.id, project_id: post.project_id) do - %Key{content_hash: ^content_hash} -> - :ok + case Repo.get_by(Key, post_id: post.id, project_id: post.project_id) do + %Key{content_hash: ^content_hash} -> + :ok - existing_key -> - label = existing_key_label(existing_key) || next_label() - {:ok, vector} = embed_text(raw_text, post.language) + existing_key -> + label = existing_key_label(existing_key) || next_label() + {:ok, vector} = embed_text(raw_text, post.language) - (existing_key || %Key{}) - |> Key.changeset(%{ - label: label, - post_id: post.id, - project_id: post.project_id, - content_hash: content_hash, - vector: Jason.encode!(vector) - }) - |> Repo.insert_or_update() + (existing_key || %Key{}) + |> Key.changeset(%{ + label: label, + post_id: post.id, + project_id: post.project_id, + content_hash: content_hash, + vector: Jason.encode!(vector) + }) + |> Repo.insert_or_update() - if Keyword.get(opts, :refresh_index, true) do - :ok = rebuild_snapshot(post.project_id) - end + if Keyword.get(opts, :refresh_index, true) do + :ok = rebuild_snapshot(post.project_id) + end - :ok - end - else - :ok + :ok end end @@ -176,7 +176,7 @@ defmodule BDS.Embeddings do %Key{content_hash: ^content_hash} -> :ok _other -> :ok = - sync_post( + sync_post_if_enabled( %{post | content: if(post.content in [nil, ""], do: body, else: post.content)}, refresh_index: false ) diff --git a/lib/bds/media.ex b/lib/bds/media.ex index 1c9c566..5fa02db 100644 --- a/lib/bds/media.ex +++ b/lib/bds/media.ex @@ -7,6 +7,7 @@ defmodule BDS.Media do alias BDS.Media.Translation alias BDS.Persistence alias BDS.Projects + alias BDS.Rebuild alias BDS.Repo alias BDS.Search alias BDS.Sidecar @@ -322,6 +323,7 @@ defmodule BDS.Media do |> list_matching_files("*.meta") |> Enum.filter(&canonical_sidecar?/1) |> Enum.filter(&binary_exists_for_sidecar?/1) + |> Rebuild.parallel_map(&parse_canonical_sidecar(project, &1)) translation_sidecars = project @@ -329,6 +331,7 @@ defmodule BDS.Media do |> Path.join("media") |> list_matching_files("*.meta") |> Enum.filter(&translation_sidecar?/1) + |> Rebuild.parallel_map(&parse_translation_sidecar(&1)) total_files = length(canonical_sidecars) + length(translation_sidecars) :ok = report_rebuild_started(on_progress, total_files, "media files") @@ -336,8 +339,8 @@ defmodule BDS.Media do media_items = canonical_sidecars |> Enum.with_index(1) - |> Enum.map(fn {sidecar_path, index} -> - media = upsert_media_from_sidecar(project, sidecar_path) + |> Enum.map(fn {sidecar, index} -> + media = upsert_media_from_sidecar(project, sidecar, sync_search: false) :ok = report_rebuild_progress(on_progress, index, total_files, "media files") media end) @@ -349,51 +352,58 @@ defmodule BDS.Media do translation_sidecars |> Enum.with_index(length(canonical_sidecars) + 1) - |> Enum.each(fn {sidecar_path, index} -> - upsert_translation_from_sidecar(project, canonical_media_by_binary_path, sidecar_path) + |> Enum.each(fn {sidecar, index} -> + upsert_translation_from_sidecar(project, canonical_media_by_binary_path, sidecar, sync_search: false) :ok = report_rebuild_progress(on_progress, index, total_files, "media files") end) + if Keyword.get(opts, :reindex_search, true) do + :ok = report_rebuild_phase(on_progress, 0.99, "Refreshing media search index") + :ok = Search.reindex_media(project.id) + end + {:ok, media_items} end - defp upsert_media_from_sidecar(project, sidecar_path) do - {:ok, fields} = sidecar_path |> File.read!() |> Sidecar.parse_document() - relative_sidecar_path = Path.relative_to(sidecar_path, Projects.project_data_dir(project)) - relative_file_path = String.trim_trailing(relative_sidecar_path, ".meta") - filename = Path.basename(relative_file_path) + defp upsert_media_from_sidecar(project, sidecar, opts) do now = Persistence.now_ms() attrs = %{ - id: Map.get(fields, "id") || Ecto.UUID.generate(), + id: Map.get(sidecar.fields, "id") || Ecto.UUID.generate(), project_id: project.id, - filename: filename, - original_name: Map.get(fields, "originalName") || filename, - mime_type: Map.get(fields, "mimeType") || detect_mime(filename), - size: Map.get(fields, "size", 0), - width: blank_to_nil(Map.get(fields, "width")), - height: blank_to_nil(Map.get(fields, "height")), - title: Map.get(fields, "title"), - alt: Map.get(fields, "alt"), - caption: Map.get(fields, "caption"), - author: Map.get(fields, "author"), - language: Map.get(fields, "language"), - file_path: relative_file_path, - sidecar_path: relative_sidecar_path, + filename: sidecar.filename, + original_name: Map.get(sidecar.fields, "originalName") || sidecar.filename, + mime_type: Map.get(sidecar.fields, "mimeType") || detect_mime(sidecar.filename), + size: Map.get(sidecar.fields, "size", 0), + width: blank_to_nil(Map.get(sidecar.fields, "width")), + height: blank_to_nil(Map.get(sidecar.fields, "height")), + title: Map.get(sidecar.fields, "title"), + alt: Map.get(sidecar.fields, "alt"), + caption: Map.get(sidecar.fields, "caption"), + author: Map.get(sidecar.fields, "author"), + language: Map.get(sidecar.fields, "language"), + file_path: sidecar.relative_file_path, + sidecar_path: sidecar.relative_sidecar_path, checksum: nil, - tags: Map.get(fields, "tags", []), - created_at: Map.get(fields, "createdAt", now), - updated_at: Map.get(fields, "updatedAt", now) + tags: Map.get(sidecar.fields, "tags", []), + created_at: Map.get(sidecar.fields, "createdAt", now), + updated_at: Map.get(sidecar.fields, "updatedAt", now) } media = Repo.get(Media, attrs.id) || - Repo.get_by(Media, project_id: project.id, file_path: relative_file_path) || %Media{} + Repo.get_by(Media, project_id: project.id, file_path: sidecar.relative_file_path) || %Media{} + + media = + media + |> Media.changeset(attrs) + |> Repo.insert_or_update!() + + if Keyword.get(opts, :sync_search, true) do + :ok = Search.sync_media(media) + end media - |> Media.changeset(attrs) - |> Repo.insert_or_update!() - |> tap(&Search.sync_media/1) end defp write_sidecar(project, media) do @@ -443,17 +453,14 @@ defmodule BDS.Media do ) end - defp upsert_translation_from_sidecar(project, canonical_media_by_binary_path, sidecar_path) do - binary_path = binary_path_for_translation_sidecar(sidecar_path) - - case Map.get(canonical_media_by_binary_path, binary_path) do + defp upsert_translation_from_sidecar(project, canonical_media_by_binary_path, sidecar, opts) do + case Map.get(canonical_media_by_binary_path, sidecar.binary_path) do nil -> :skip media -> - {:ok, fields} = sidecar_path |> File.read!() |> Sidecar.parse_document() now = Persistence.now_ms() - language = Map.fetch!(fields, "language") + language = Map.fetch!(sidecar.fields, "language") translation = Repo.get_by(Translation, translation_for: media.id, language: language) || @@ -465,18 +472,42 @@ defmodule BDS.Media do project_id: project.id, translation_for: media.id, language: language, - title: Map.get(fields, "title"), - alt: Map.get(fields, "alt"), - caption: Map.get(fields, "caption"), + title: Map.get(sidecar.fields, "title"), + alt: Map.get(sidecar.fields, "alt"), + caption: Map.get(sidecar.fields, "caption"), created_at: translation.created_at || now, updated_at: now }) |> Repo.insert_or_update!() - :ok = Search.sync_media(media.id) + if Keyword.get(opts, :sync_search, true) do + :ok = Search.sync_media(media.id) + end end end + defp parse_canonical_sidecar(project, sidecar_path) do + {:ok, fields} = sidecar_path |> File.read!() |> Sidecar.parse_document() + relative_sidecar_path = Path.relative_to(sidecar_path, Projects.project_data_dir(project)) + relative_file_path = String.trim_trailing(relative_sidecar_path, ".meta") + + %{ + fields: fields, + relative_sidecar_path: relative_sidecar_path, + relative_file_path: relative_file_path, + filename: Path.basename(relative_file_path) + } + end + + defp parse_translation_sidecar(sidecar_path) do + {:ok, fields} = sidecar_path |> File.read!() |> Sidecar.parse_document() + + %{ + fields: fields, + binary_path: binary_path_for_translation_sidecar(sidecar_path) + } + end + defp ensure_thumbnails(project, media) do if image_mime?(media.mime_type) do source_path = Path.join(Projects.project_data_dir(project), media.file_path) @@ -673,4 +704,11 @@ defmodule BDS.Media do callback.(0.05 + 0.95 * (current / total), "Rebuilding #{label} (#{current}/#{total})") :ok end + + defp report_rebuild_phase(nil, _progress, _message), do: :ok + + defp report_rebuild_phase(callback, progress, message) do + callback.(progress, message) + :ok + end end diff --git a/lib/bds/posts.ex b/lib/bds/posts.ex index a6ffcad..23e305d 100644 --- a/lib/bds/posts.ex +++ b/lib/bds/posts.ex @@ -12,6 +12,7 @@ defmodule BDS.Posts do alias BDS.Posts.Post alias BDS.Posts.Translation alias BDS.Projects + alias BDS.Rebuild alias BDS.Repo alias BDS.Search alias BDS.Slug @@ -145,7 +146,7 @@ defmodule BDS.Posts do |> Projects.project_data_dir() |> Path.join("posts") |> list_matching_files("*.md") - |> Enum.map(&parse_rebuild_file(project, &1)) + |> Rebuild.parallel_map(&parse_rebuild_file(project, &1)) total_files = length(rebuild_files) :ok = report_rebuild_started(on_progress, total_files, "post files") @@ -156,7 +157,7 @@ defmodule BDS.Posts do post_files |> Enum.with_index(1) |> Enum.map(fn {file, index} -> - post = upsert_post_from_rebuild_file(project_id, file) + post = upsert_post_from_rebuild_file(project_id, file, sync_search: false, sync_embeddings: false) :ok = report_rebuild_progress(on_progress, index, total_files, "post files") post end) @@ -164,10 +165,20 @@ defmodule BDS.Posts do translation_files |> Enum.with_index(length(post_files) + 1) |> Enum.each(fn {file, index} -> - upsert_post_translation_from_rebuild_file(project_id, file) + upsert_post_translation_from_rebuild_file(project_id, file, sync_search: false) :ok = report_rebuild_progress(on_progress, index, total_files, "post files") end) + if Keyword.get(opts, :reindex_search, true) do + :ok = report_rebuild_phase(on_progress, 0.97, "Refreshing post search index") + :ok = Search.reindex_posts(project_id) + end + + if Keyword.get(opts, :rebuild_embeddings, true) do + :ok = report_rebuild_phase(on_progress, 0.99, "Refreshing post embeddings") + {:ok, _rebuilt_post_ids} = Embeddings.rebuild_project(project_id) + end + {:ok, posts} end @@ -658,7 +669,7 @@ defmodule BDS.Posts do upsert_post_from_rebuild_file(project_id, rebuild_file) end - defp upsert_post_from_rebuild_file(project_id, rebuild_file) do + defp upsert_post_from_rebuild_file(project_id, rebuild_file, opts \\ []) do fields = rebuild_file.fields now = Persistence.now_ms() @@ -693,14 +704,23 @@ defmodule BDS.Posts do Repo.get_by(Post, project_id: project_id, file_path: rebuild_file.relative_path) || Repo.get_by(Post, project_id: project_id, slug: attrs.slug) || %Post{} + post = + post + |> Post.changeset(attrs) + |> Repo.insert_or_update!() + + if Keyword.get(opts, :sync_search, true) do + :ok = Search.sync_post(post) + end + + if Keyword.get(opts, :sync_embeddings, true) do + :ok = Embeddings.sync_post(post) + end + post - |> Post.changeset(attrs) - |> Repo.insert_or_update!() - |> tap(&Search.sync_post/1) - |> tap(&Embeddings.sync_post/1) end - defp upsert_post_translation_from_rebuild_file(project_id, rebuild_file) do + defp upsert_post_translation_from_rebuild_file(project_id, rebuild_file, opts) do fields = rebuild_file.fields source_post_id = Map.fetch!(fields, "translationFor") source_post = Repo.get_by!(Post, project_id: project_id, id: source_post_id) @@ -730,7 +750,9 @@ defmodule BDS.Posts do |> Translation.changeset(attrs) |> Repo.insert_or_update!() |> tap(fn _translation -> - :ok = Search.sync_post(source_post_id) + if Keyword.get(opts, :sync_search, true) do + :ok = Search.sync_post(source_post_id) + end end) end @@ -984,4 +1006,11 @@ defmodule BDS.Posts do callback.(0.05 + 0.95 * (current / total), "Rebuilding #{label} (#{current}/#{total})") :ok end + + defp report_rebuild_phase(nil, _progress, _message), do: :ok + + defp report_rebuild_phase(callback, progress, message) do + callback.(progress, message) + :ok + end end diff --git a/lib/bds/rebuild.ex b/lib/bds/rebuild.ex new file mode 100644 index 0000000..736af49 --- /dev/null +++ b/lib/bds/rebuild.ex @@ -0,0 +1,16 @@ +defmodule BDS.Rebuild do + @moduledoc false + + def parallel_map(items, mapper, opts \\ []) when is_list(items) and is_function(mapper, 1) do + max_concurrency = Keyword.get(opts, :max_concurrency, System.schedulers_online()) + ordered = Keyword.get(opts, :ordered, true) + timeout = Keyword.get(opts, :timeout, :infinity) + + items + |> Task.async_stream(mapper, max_concurrency: max_concurrency, ordered: ordered, timeout: timeout) + |> Enum.map(fn + {:ok, item} -> item + {:exit, reason} -> exit(reason) + end) + end +end diff --git a/lib/bds/search.ex b/lib/bds/search.ex index 084458d..ac86c04 100644 --- a/lib/bds/search.ex +++ b/lib/bds/search.ex @@ -135,7 +135,7 @@ defmodule BDS.Search do ) Repo.all(from post in Post, where: post.project_id == ^project_id) - |> Enum.each(&sync_post/1) + |> Enum.each(&insert_post_index/1) :ok end @@ -147,21 +147,14 @@ defmodule BDS.Search do ) Repo.all(from media in Media, where: media.project_id == ^project_id) - |> Enum.each(&sync_media/1) + |> Enum.each(&insert_media_index/1) :ok end def sync_post(%Post{} = post) do delete_post(post.id) - - {title, excerpt, content, tags, categories} = post_index_fields(post) - - Repo.query!( - "INSERT INTO posts_fts (post_id, title, excerpt, content, tags, categories) VALUES (?, ?, ?, ?, ?, ?)", - [post.id, title, excerpt, content, tags, categories] - ) - + insert_post_index(post) :ok end @@ -181,14 +174,7 @@ defmodule BDS.Search do def sync_media(%Media{} = media) do delete_media(media.id) - - {title, alt, caption, original_name, tags} = media_index_fields(media) - - Repo.query!( - "INSERT INTO media_fts (media_id, title, alt, caption, original_name, tags) VALUES (?, ?, ?, ?, ?, ?)", - [media.id, title, alt, caption, original_name, tags] - ) - + insert_media_index(media) :ok end @@ -206,6 +192,24 @@ defmodule BDS.Search do :ok end + defp insert_post_index(%Post{} = post) do + {title, excerpt, content, tags, categories} = post_index_fields(post) + + Repo.query!( + "INSERT INTO posts_fts (post_id, title, excerpt, content, tags, categories) VALUES (?, ?, ?, ?, ?, ?)", + [post.id, title, excerpt, content, tags, categories] + ) + end + + defp insert_media_index(%Media{} = media) do + {title, alt, caption, original_name, tags} = media_index_fields(media) + + Repo.query!( + "INSERT INTO media_fts (media_id, title, alt, caption, original_name, tags) VALUES (?, ?, ?, ?, ?, ?)", + [media.id, title, alt, caption, original_name, tags] + ) + end + defp candidate_post_ids(project_id, query, language) do if blank_query?(query) do Repo.all(from post in Post, where: post.project_id == ^project_id, select: post.id) diff --git a/test/bds/desktop/shell_commands_test.exs b/test/bds/desktop/shell_commands_test.exs index 487c292..f8ec43e 100644 --- a/test/bds/desktop/shell_commands_test.exs +++ b/test/bds/desktop/shell_commands_test.exs @@ -323,10 +323,22 @@ defmodule BDS.Desktop.ShellCommandsTest do ) assert progressed.group_name == "Maintenance" - assert progressed.message =~ "Rebuilding post files" + assert String.contains?(progressed.message, "post") assert wait_for_task(progressed.id, &(&1.status == :completed and &1.progress == 1.0), 5_000).status == :completed + + tasks = wait_for_tasks_by_name([ + "Rebuild Posts From Files", + "Rebuild Media From Files", + "Rebuild Scripts From Files", + "Rebuild Templates From Files", + "Rebuild Post Links", + "Regenerate Missing Thumbnails", + "Rebuild Embedding Index" + ], &(&1.status == :completed), 20_000) + + assert Enum.all?(tasks, &(&1.status == :completed)) end test "reindex_text queues a tracked background task for the active project", %{project: project} do diff --git a/test/bds/posts_test.exs b/test/bds/posts_test.exs index c190f4b..5c0917a 100644 --- a/test/bds/posts_test.exs +++ b/test/bds/posts_test.exs @@ -618,6 +618,68 @@ defmodule BDS.PostsTest do refute BDS.Repo.get(BDS.Posts.Post, stale_post.id) end + test "rebuild_posts_from_files batches search and embedding refresh after import", %{project: project} do + assert {:ok, _metadata} = + BDS.Metadata.update_project_metadata(project.id, %{semantic_similarity_enabled: true}) + + posts_dir = Path.join([BDS.Projects.project_data_dir(project), "posts", "2026", "04"]) + File.mkdir_p!(posts_dir) + + Enum.each(1..3, fn index -> + slug = "batched-post-#{index}" + + File.write!( + Path.join(posts_dir, "#{slug}.md"), + [ + "---", + "id: #{slug}", + "title: Batched Post #{index}", + "slug: #{slug}", + "status: published", + "language: en", + "createdAt: 1711843200", + "updatedAt: 1711929600", + "publishedAt: 1712016000", + "tags:", + "categories:", + "---", + "Body #{index}", + "" + ] + |> Enum.join("\n") + ) + end) + + handler_id = "posts-rebuild-batch-#{System.unique_integer([:positive])}" + + :ok = + :telemetry.attach( + handler_id, + [:bds, :repo, :query], + &__MODULE__.handle_repo_query/4, + self() + ) + + on_exit(fn -> :telemetry.detach(handler_id) end) + + assert {:ok, posts} = BDS.Posts.rebuild_posts_from_files(project.id) + assert length(posts) == 3 + + queries = drain_repo_queries([]) + + assert count_queries(queries, "settings") <= 12 + assert count_queries(queries, "posts_fts") <= 4 + + assert {:ok, results} = BDS.Search.search_posts(project.id, "Batched Post", %{}) + assert results.total == 3 + + assert {:ok, %{indexed: 3, total: 3}} = BDS.Embeddings.get_indexing_progress(project.id) + end + + def handle_repo_query(_event, _measurements, metadata, owner_pid) do + send(owner_pid, {:repo_query, metadata.query || ""}) + end + defp errors_on(changeset) do Ecto.Changeset.traverse_errors(changeset, fn {message, opts} -> Regex.replace(~r"%{(\w+)}", message, fn _, key -> @@ -625,4 +687,16 @@ defmodule BDS.PostsTest do end) end) end + + defp drain_repo_queries(acc) do + receive do + {:repo_query, query} -> drain_repo_queries([query | acc]) + after + 0 -> Enum.reverse(acc) + end + end + + defp count_queries(queries, fragment) do + Enum.count(queries, &String.contains?(&1, fragment)) + end end