fix: back to at least part-parallism, except sqlite

This commit is contained in:
2026-04-25 19:24:51 +02:00
parent 7c73b984dc
commit 5c138d54b8
8 changed files with 278 additions and 100 deletions

View File

@@ -236,7 +236,12 @@ defmodule BDS.Desktop.ShellCommands do
%{
name: "Rebuild Posts From Files",
work: fn report ->
{:ok, posts} = Maintenance.rebuild_from_filesystem(project.id, "post", on_progress: report)
{:ok, posts} =
Maintenance.rebuild_from_filesystem(project.id, "post",
on_progress: report,
rebuild_embeddings: false
)
report.(1.0, "Post rebuild complete")
%{project_id: project.id, counts: %{posts: length(posts)}}
end

View File

@@ -39,7 +39,11 @@ defmodule BDS.Embeddings do
end
def sync_post(%Post{} = post) do
sync_post(post, refresh_index: true)
if enabled_for_project?(post.project_id) do
sync_post_if_enabled(post, refresh_index: true)
else
:ok
end
end
def sync_post(post_id) when is_binary(post_id) do
@@ -61,7 +65,7 @@ defmodule BDS.Embeddings do
where: key.project_id == ^project_id and key.post_id not in ^post_ids
)
Enum.each(posts, &sync_post(&1, refresh_index: false))
Enum.each(posts, &sync_post_if_enabled(&1, refresh_index: false))
:ok = rebuild_snapshot(project_id)
{:ok, post_ids}
else
@@ -109,38 +113,34 @@ defmodule BDS.Embeddings do
end
end
defp sync_post(%Post{} = post, opts) do
if enabled_for_project?(post.project_id) do
body = resolve_post_body(post)
raw_text = compose_embedding_source(post.title, body)
content_hash = hash_text(raw_text)
defp sync_post_if_enabled(%Post{} = post, opts) do
body = resolve_post_body(post)
raw_text = compose_embedding_source(post.title, body)
content_hash = hash_text(raw_text)
case Repo.get_by(Key, post_id: post.id, project_id: post.project_id) do
%Key{content_hash: ^content_hash} ->
:ok
case Repo.get_by(Key, post_id: post.id, project_id: post.project_id) do
%Key{content_hash: ^content_hash} ->
:ok
existing_key ->
label = existing_key_label(existing_key) || next_label()
{:ok, vector} = embed_text(raw_text, post.language)
existing_key ->
label = existing_key_label(existing_key) || next_label()
{:ok, vector} = embed_text(raw_text, post.language)
(existing_key || %Key{})
|> Key.changeset(%{
label: label,
post_id: post.id,
project_id: post.project_id,
content_hash: content_hash,
vector: Jason.encode!(vector)
})
|> Repo.insert_or_update()
(existing_key || %Key{})
|> Key.changeset(%{
label: label,
post_id: post.id,
project_id: post.project_id,
content_hash: content_hash,
vector: Jason.encode!(vector)
})
|> Repo.insert_or_update()
if Keyword.get(opts, :refresh_index, true) do
:ok = rebuild_snapshot(post.project_id)
end
if Keyword.get(opts, :refresh_index, true) do
:ok = rebuild_snapshot(post.project_id)
end
:ok
end
else
:ok
:ok
end
end
@@ -176,7 +176,7 @@ defmodule BDS.Embeddings do
%Key{content_hash: ^content_hash} -> :ok
_other ->
:ok =
sync_post(
sync_post_if_enabled(
%{post | content: if(post.content in [nil, ""], do: body, else: post.content)},
refresh_index: false
)

View File

@@ -7,6 +7,7 @@ defmodule BDS.Media do
alias BDS.Media.Translation
alias BDS.Persistence
alias BDS.Projects
alias BDS.Rebuild
alias BDS.Repo
alias BDS.Search
alias BDS.Sidecar
@@ -322,6 +323,7 @@ defmodule BDS.Media do
|> list_matching_files("*.meta")
|> Enum.filter(&canonical_sidecar?/1)
|> Enum.filter(&binary_exists_for_sidecar?/1)
|> Rebuild.parallel_map(&parse_canonical_sidecar(project, &1))
translation_sidecars =
project
@@ -329,6 +331,7 @@ defmodule BDS.Media do
|> Path.join("media")
|> list_matching_files("*.meta")
|> Enum.filter(&translation_sidecar?/1)
|> Rebuild.parallel_map(&parse_translation_sidecar(&1))
total_files = length(canonical_sidecars) + length(translation_sidecars)
:ok = report_rebuild_started(on_progress, total_files, "media files")
@@ -336,8 +339,8 @@ defmodule BDS.Media do
media_items =
canonical_sidecars
|> Enum.with_index(1)
|> Enum.map(fn {sidecar_path, index} ->
media = upsert_media_from_sidecar(project, sidecar_path)
|> Enum.map(fn {sidecar, index} ->
media = upsert_media_from_sidecar(project, sidecar, sync_search: false)
:ok = report_rebuild_progress(on_progress, index, total_files, "media files")
media
end)
@@ -349,51 +352,58 @@ defmodule BDS.Media do
translation_sidecars
|> Enum.with_index(length(canonical_sidecars) + 1)
|> Enum.each(fn {sidecar_path, index} ->
upsert_translation_from_sidecar(project, canonical_media_by_binary_path, sidecar_path)
|> Enum.each(fn {sidecar, index} ->
upsert_translation_from_sidecar(project, canonical_media_by_binary_path, sidecar, sync_search: false)
:ok = report_rebuild_progress(on_progress, index, total_files, "media files")
end)
if Keyword.get(opts, :reindex_search, true) do
:ok = report_rebuild_phase(on_progress, 0.99, "Refreshing media search index")
:ok = Search.reindex_media(project.id)
end
{:ok, media_items}
end
defp upsert_media_from_sidecar(project, sidecar_path) do
{:ok, fields} = sidecar_path |> File.read!() |> Sidecar.parse_document()
relative_sidecar_path = Path.relative_to(sidecar_path, Projects.project_data_dir(project))
relative_file_path = String.trim_trailing(relative_sidecar_path, ".meta")
filename = Path.basename(relative_file_path)
defp upsert_media_from_sidecar(project, sidecar, opts) do
now = Persistence.now_ms()
attrs = %{
id: Map.get(fields, "id") || Ecto.UUID.generate(),
id: Map.get(sidecar.fields, "id") || Ecto.UUID.generate(),
project_id: project.id,
filename: filename,
original_name: Map.get(fields, "originalName") || filename,
mime_type: Map.get(fields, "mimeType") || detect_mime(filename),
size: Map.get(fields, "size", 0),
width: blank_to_nil(Map.get(fields, "width")),
height: blank_to_nil(Map.get(fields, "height")),
title: Map.get(fields, "title"),
alt: Map.get(fields, "alt"),
caption: Map.get(fields, "caption"),
author: Map.get(fields, "author"),
language: Map.get(fields, "language"),
file_path: relative_file_path,
sidecar_path: relative_sidecar_path,
filename: sidecar.filename,
original_name: Map.get(sidecar.fields, "originalName") || sidecar.filename,
mime_type: Map.get(sidecar.fields, "mimeType") || detect_mime(sidecar.filename),
size: Map.get(sidecar.fields, "size", 0),
width: blank_to_nil(Map.get(sidecar.fields, "width")),
height: blank_to_nil(Map.get(sidecar.fields, "height")),
title: Map.get(sidecar.fields, "title"),
alt: Map.get(sidecar.fields, "alt"),
caption: Map.get(sidecar.fields, "caption"),
author: Map.get(sidecar.fields, "author"),
language: Map.get(sidecar.fields, "language"),
file_path: sidecar.relative_file_path,
sidecar_path: sidecar.relative_sidecar_path,
checksum: nil,
tags: Map.get(fields, "tags", []),
created_at: Map.get(fields, "createdAt", now),
updated_at: Map.get(fields, "updatedAt", now)
tags: Map.get(sidecar.fields, "tags", []),
created_at: Map.get(sidecar.fields, "createdAt", now),
updated_at: Map.get(sidecar.fields, "updatedAt", now)
}
media =
Repo.get(Media, attrs.id) ||
Repo.get_by(Media, project_id: project.id, file_path: relative_file_path) || %Media{}
Repo.get_by(Media, project_id: project.id, file_path: sidecar.relative_file_path) || %Media{}
media =
media
|> Media.changeset(attrs)
|> Repo.insert_or_update!()
if Keyword.get(opts, :sync_search, true) do
:ok = Search.sync_media(media)
end
media
|> Media.changeset(attrs)
|> Repo.insert_or_update!()
|> tap(&Search.sync_media/1)
end
defp write_sidecar(project, media) do
@@ -443,17 +453,14 @@ defmodule BDS.Media do
)
end
defp upsert_translation_from_sidecar(project, canonical_media_by_binary_path, sidecar_path) do
binary_path = binary_path_for_translation_sidecar(sidecar_path)
case Map.get(canonical_media_by_binary_path, binary_path) do
defp upsert_translation_from_sidecar(project, canonical_media_by_binary_path, sidecar, opts) do
case Map.get(canonical_media_by_binary_path, sidecar.binary_path) do
nil ->
:skip
media ->
{:ok, fields} = sidecar_path |> File.read!() |> Sidecar.parse_document()
now = Persistence.now_ms()
language = Map.fetch!(fields, "language")
language = Map.fetch!(sidecar.fields, "language")
translation =
Repo.get_by(Translation, translation_for: media.id, language: language) ||
@@ -465,18 +472,42 @@ defmodule BDS.Media do
project_id: project.id,
translation_for: media.id,
language: language,
title: Map.get(fields, "title"),
alt: Map.get(fields, "alt"),
caption: Map.get(fields, "caption"),
title: Map.get(sidecar.fields, "title"),
alt: Map.get(sidecar.fields, "alt"),
caption: Map.get(sidecar.fields, "caption"),
created_at: translation.created_at || now,
updated_at: now
})
|> Repo.insert_or_update!()
:ok = Search.sync_media(media.id)
if Keyword.get(opts, :sync_search, true) do
:ok = Search.sync_media(media.id)
end
end
end
defp parse_canonical_sidecar(project, sidecar_path) do
{:ok, fields} = sidecar_path |> File.read!() |> Sidecar.parse_document()
relative_sidecar_path = Path.relative_to(sidecar_path, Projects.project_data_dir(project))
relative_file_path = String.trim_trailing(relative_sidecar_path, ".meta")
%{
fields: fields,
relative_sidecar_path: relative_sidecar_path,
relative_file_path: relative_file_path,
filename: Path.basename(relative_file_path)
}
end
defp parse_translation_sidecar(sidecar_path) do
{:ok, fields} = sidecar_path |> File.read!() |> Sidecar.parse_document()
%{
fields: fields,
binary_path: binary_path_for_translation_sidecar(sidecar_path)
}
end
defp ensure_thumbnails(project, media) do
if image_mime?(media.mime_type) do
source_path = Path.join(Projects.project_data_dir(project), media.file_path)
@@ -673,4 +704,11 @@ defmodule BDS.Media do
callback.(0.05 + 0.95 * (current / total), "Rebuilding #{label} (#{current}/#{total})")
:ok
end
defp report_rebuild_phase(nil, _progress, _message), do: :ok
defp report_rebuild_phase(callback, progress, message) do
callback.(progress, message)
:ok
end
end

View File

@@ -12,6 +12,7 @@ defmodule BDS.Posts do
alias BDS.Posts.Post
alias BDS.Posts.Translation
alias BDS.Projects
alias BDS.Rebuild
alias BDS.Repo
alias BDS.Search
alias BDS.Slug
@@ -145,7 +146,7 @@ defmodule BDS.Posts do
|> Projects.project_data_dir()
|> Path.join("posts")
|> list_matching_files("*.md")
|> Enum.map(&parse_rebuild_file(project, &1))
|> Rebuild.parallel_map(&parse_rebuild_file(project, &1))
total_files = length(rebuild_files)
:ok = report_rebuild_started(on_progress, total_files, "post files")
@@ -156,7 +157,7 @@ defmodule BDS.Posts do
post_files
|> Enum.with_index(1)
|> Enum.map(fn {file, index} ->
post = upsert_post_from_rebuild_file(project_id, file)
post = upsert_post_from_rebuild_file(project_id, file, sync_search: false, sync_embeddings: false)
:ok = report_rebuild_progress(on_progress, index, total_files, "post files")
post
end)
@@ -164,10 +165,20 @@ defmodule BDS.Posts do
translation_files
|> Enum.with_index(length(post_files) + 1)
|> Enum.each(fn {file, index} ->
upsert_post_translation_from_rebuild_file(project_id, file)
upsert_post_translation_from_rebuild_file(project_id, file, sync_search: false)
:ok = report_rebuild_progress(on_progress, index, total_files, "post files")
end)
if Keyword.get(opts, :reindex_search, true) do
:ok = report_rebuild_phase(on_progress, 0.97, "Refreshing post search index")
:ok = Search.reindex_posts(project_id)
end
if Keyword.get(opts, :rebuild_embeddings, true) do
:ok = report_rebuild_phase(on_progress, 0.99, "Refreshing post embeddings")
{:ok, _rebuilt_post_ids} = Embeddings.rebuild_project(project_id)
end
{:ok, posts}
end
@@ -658,7 +669,7 @@ defmodule BDS.Posts do
upsert_post_from_rebuild_file(project_id, rebuild_file)
end
defp upsert_post_from_rebuild_file(project_id, rebuild_file) do
defp upsert_post_from_rebuild_file(project_id, rebuild_file, opts \\ []) do
fields = rebuild_file.fields
now = Persistence.now_ms()
@@ -693,14 +704,23 @@ defmodule BDS.Posts do
Repo.get_by(Post, project_id: project_id, file_path: rebuild_file.relative_path) ||
Repo.get_by(Post, project_id: project_id, slug: attrs.slug) || %Post{}
post =
post
|> Post.changeset(attrs)
|> Repo.insert_or_update!()
if Keyword.get(opts, :sync_search, true) do
:ok = Search.sync_post(post)
end
if Keyword.get(opts, :sync_embeddings, true) do
:ok = Embeddings.sync_post(post)
end
post
|> Post.changeset(attrs)
|> Repo.insert_or_update!()
|> tap(&Search.sync_post/1)
|> tap(&Embeddings.sync_post/1)
end
defp upsert_post_translation_from_rebuild_file(project_id, rebuild_file) do
defp upsert_post_translation_from_rebuild_file(project_id, rebuild_file, opts) do
fields = rebuild_file.fields
source_post_id = Map.fetch!(fields, "translationFor")
source_post = Repo.get_by!(Post, project_id: project_id, id: source_post_id)
@@ -730,7 +750,9 @@ defmodule BDS.Posts do
|> Translation.changeset(attrs)
|> Repo.insert_or_update!()
|> tap(fn _translation ->
:ok = Search.sync_post(source_post_id)
if Keyword.get(opts, :sync_search, true) do
:ok = Search.sync_post(source_post_id)
end
end)
end
@@ -984,4 +1006,11 @@ defmodule BDS.Posts do
callback.(0.05 + 0.95 * (current / total), "Rebuilding #{label} (#{current}/#{total})")
:ok
end
defp report_rebuild_phase(nil, _progress, _message), do: :ok
defp report_rebuild_phase(callback, progress, message) do
callback.(progress, message)
:ok
end
end

16
lib/bds/rebuild.ex Normal file
View File

@@ -0,0 +1,16 @@
defmodule BDS.Rebuild do
@moduledoc false
def parallel_map(items, mapper, opts \\ []) when is_list(items) and is_function(mapper, 1) do
max_concurrency = Keyword.get(opts, :max_concurrency, System.schedulers_online())
ordered = Keyword.get(opts, :ordered, true)
timeout = Keyword.get(opts, :timeout, :infinity)
items
|> Task.async_stream(mapper, max_concurrency: max_concurrency, ordered: ordered, timeout: timeout)
|> Enum.map(fn
{:ok, item} -> item
{:exit, reason} -> exit(reason)
end)
end
end

View File

@@ -135,7 +135,7 @@ defmodule BDS.Search do
)
Repo.all(from post in Post, where: post.project_id == ^project_id)
|> Enum.each(&sync_post/1)
|> Enum.each(&insert_post_index/1)
:ok
end
@@ -147,21 +147,14 @@ defmodule BDS.Search do
)
Repo.all(from media in Media, where: media.project_id == ^project_id)
|> Enum.each(&sync_media/1)
|> Enum.each(&insert_media_index/1)
:ok
end
def sync_post(%Post{} = post) do
delete_post(post.id)
{title, excerpt, content, tags, categories} = post_index_fields(post)
Repo.query!(
"INSERT INTO posts_fts (post_id, title, excerpt, content, tags, categories) VALUES (?, ?, ?, ?, ?, ?)",
[post.id, title, excerpt, content, tags, categories]
)
insert_post_index(post)
:ok
end
@@ -181,14 +174,7 @@ defmodule BDS.Search do
def sync_media(%Media{} = media) do
delete_media(media.id)
{title, alt, caption, original_name, tags} = media_index_fields(media)
Repo.query!(
"INSERT INTO media_fts (media_id, title, alt, caption, original_name, tags) VALUES (?, ?, ?, ?, ?, ?)",
[media.id, title, alt, caption, original_name, tags]
)
insert_media_index(media)
:ok
end
@@ -206,6 +192,24 @@ defmodule BDS.Search do
:ok
end
defp insert_post_index(%Post{} = post) do
{title, excerpt, content, tags, categories} = post_index_fields(post)
Repo.query!(
"INSERT INTO posts_fts (post_id, title, excerpt, content, tags, categories) VALUES (?, ?, ?, ?, ?, ?)",
[post.id, title, excerpt, content, tags, categories]
)
end
defp insert_media_index(%Media{} = media) do
{title, alt, caption, original_name, tags} = media_index_fields(media)
Repo.query!(
"INSERT INTO media_fts (media_id, title, alt, caption, original_name, tags) VALUES (?, ?, ?, ?, ?, ?)",
[media.id, title, alt, caption, original_name, tags]
)
end
defp candidate_post_ids(project_id, query, language) do
if blank_query?(query) do
Repo.all(from post in Post, where: post.project_id == ^project_id, select: post.id)

View File

@@ -323,10 +323,22 @@ defmodule BDS.Desktop.ShellCommandsTest do
)
assert progressed.group_name == "Maintenance"
assert progressed.message =~ "Rebuilding post files"
assert String.contains?(progressed.message, "post")
assert wait_for_task(progressed.id, &(&1.status == :completed and &1.progress == 1.0), 5_000).status ==
:completed
tasks = wait_for_tasks_by_name([
"Rebuild Posts From Files",
"Rebuild Media From Files",
"Rebuild Scripts From Files",
"Rebuild Templates From Files",
"Rebuild Post Links",
"Regenerate Missing Thumbnails",
"Rebuild Embedding Index"
], &(&1.status == :completed), 20_000)
assert Enum.all?(tasks, &(&1.status == :completed))
end
test "reindex_text queues a tracked background task for the active project", %{project: project} do

View File

@@ -618,6 +618,68 @@ defmodule BDS.PostsTest do
refute BDS.Repo.get(BDS.Posts.Post, stale_post.id)
end
test "rebuild_posts_from_files batches search and embedding refresh after import", %{project: project} do
assert {:ok, _metadata} =
BDS.Metadata.update_project_metadata(project.id, %{semantic_similarity_enabled: true})
posts_dir = Path.join([BDS.Projects.project_data_dir(project), "posts", "2026", "04"])
File.mkdir_p!(posts_dir)
Enum.each(1..3, fn index ->
slug = "batched-post-#{index}"
File.write!(
Path.join(posts_dir, "#{slug}.md"),
[
"---",
"id: #{slug}",
"title: Batched Post #{index}",
"slug: #{slug}",
"status: published",
"language: en",
"createdAt: 1711843200",
"updatedAt: 1711929600",
"publishedAt: 1712016000",
"tags:",
"categories:",
"---",
"Body #{index}",
""
]
|> Enum.join("\n")
)
end)
handler_id = "posts-rebuild-batch-#{System.unique_integer([:positive])}"
:ok =
:telemetry.attach(
handler_id,
[:bds, :repo, :query],
&__MODULE__.handle_repo_query/4,
self()
)
on_exit(fn -> :telemetry.detach(handler_id) end)
assert {:ok, posts} = BDS.Posts.rebuild_posts_from_files(project.id)
assert length(posts) == 3
queries = drain_repo_queries([])
assert count_queries(queries, "settings") <= 12
assert count_queries(queries, "posts_fts") <= 4
assert {:ok, results} = BDS.Search.search_posts(project.id, "Batched Post", %{})
assert results.total == 3
assert {:ok, %{indexed: 3, total: 3}} = BDS.Embeddings.get_indexing_progress(project.id)
end
def handle_repo_query(_event, _measurements, metadata, owner_pid) do
send(owner_pid, {:repo_query, metadata.query || ""})
end
defp errors_on(changeset) do
Ecto.Changeset.traverse_errors(changeset, fn {message, opts} ->
Regex.replace(~r"%{(\w+)}", message, fn _, key ->
@@ -625,4 +687,16 @@ defmodule BDS.PostsTest do
end)
end)
end
defp drain_repo_queries(acc) do
receive do
{:repo_query, query} -> drain_repo_queries([query | acc])
after
0 -> Enum.reverse(acc)
end
end
defp count_queries(queries, fragment) do
Enum.count(queries, &String.contains?(&1, fragment))
end
end