From e3a1010ae9b6812412888ecc76353895a6ebc817 Mon Sep 17 00:00:00 2001 From: Chili Palmer Date: Fri, 12 Jun 2026 11:48:44 +0200 Subject: [PATCH] fix: implement TD-05, replacement of XML parser --- lib/bds/import_execution.ex | 238 +++++++----- lib/bds/posts/rebuild_from_files.ex | 100 +++-- lib/bds/wxr_parser.ex | 500 ++++++++++++++++--------- mix.exs | 1 + test/bds/csm005_sql_filtering_test.exs | 2 +- test/bds/import_execution_test.exs | 60 +++ test/bds/posts_test.exs | 44 +++ test/bds/wxr_parser_test.exs | 25 ++ 8 files changed, 670 insertions(+), 300 deletions(-) diff --git a/lib/bds/import_execution.ex b/lib/bds/import_execution.ex index d0e7a5a..0889650 100644 --- a/lib/bds/import_execution.ex +++ b/lib/bds/import_execution.ex @@ -9,6 +9,8 @@ defmodule BDS.ImportExecution do alias BDS.MapUtils alias BDS.Tags + @transaction_batch_size 500 + def execute_import(project_id, report, opts \\ []) when is_binary(project_id) and is_map(report) do normalized_report = normalize_report(report) @@ -45,54 +47,76 @@ defmodule BDS.ImportExecution do notify_progress(on_progress, "tags", 0, taxonomy_total, "creating_tags", started_at) - result = - execute_taxonomies(category_items, tag_items, project_id, result, on_progress, started_at) + notify_progress(on_progress, "tags", 0, taxonomy_total, "creating_tags", started_at) - notify_progress(on_progress, "posts", 0, length(post_items), "importing_posts", started_at) - - result = - execute_posts( - post_items, - project_id, - default_author, - tag_mapping, - category_mapping, - result, - on_progress, - :posts, - started_at - ) - - notify_progress(on_progress, "media", 0, length(media_items), "importing_media", started_at) - - result = - execute_media( - media_items, - project_id, - default_author, - result, - on_progress, - uploads_folder_path, - started_at - ) - - notify_progress(on_progress, "pages", 0, length(page_items), "importing_pages", started_at) - - result = - execute_posts( - page_items, - project_id, - default_author, - tag_mapping, - category_mapping, - result, - on_progress, - :pages, - started_at - ) - - notify_progress(on_progress, "complete", 1, 1, "import_complete", started_at) - {:ok, result} + with {:ok, after_taxonomies} <- + execute_taxonomies(category_items, tag_items, project_id, result, on_progress, started_at), + :ok <- + notify_progress( + on_progress, + "posts", + 0, + length(post_items), + "importing_posts", + started_at + ), + {:ok, after_posts} <- + execute_posts( + post_items, + project_id, + default_author, + tag_mapping, + category_mapping, + after_taxonomies, + on_progress, + :posts, + started_at + ), + :ok <- + notify_progress( + on_progress, + "media", + 0, + length(media_items), + "importing_media", + started_at + ), + {:ok, after_media} <- + execute_media( + media_items, + project_id, + default_author, + after_posts, + on_progress, + uploads_folder_path, + started_at + ), + :ok <- + notify_progress( + on_progress, + "pages", + 0, + length(page_items), + "importing_pages", + started_at + ), + {:ok, final_result} <- + execute_posts( + page_items, + project_id, + default_author, + tag_mapping, + category_mapping, + after_media, + on_progress, + :pages, + started_at + ) do + notify_progress(on_progress, "complete", 1, 1, "import_complete", started_at) + {:ok, final_result} + else + {:error, reason} -> {:error, %{message: format_import_error(reason)}} + end rescue error -> {:error, %{message: Exception.message(error)}} end @@ -101,9 +125,7 @@ defmodule BDS.ImportExecution do items = category_items ++ tag_items total = length(items) - items - |> Enum.with_index(1) - |> Enum.reduce(result, fn {item, index}, acc -> + transaction_reduce(items, result, fn item, index, acc -> cond do Map.get(item, :exists_in_project) || not is_nil(Map.get(item, :mapped_to)) -> notify_progress( @@ -115,7 +137,7 @@ defmodule BDS.ImportExecution do started_at ) - put_in(acc, [:tags, :skipped], acc.tags.skipped + 1) + {:ok, put_in(acc, [:tags, :skipped], acc.tags.skipped + 1)} true -> case Tags.create_tag(%{project_id: project_id, name: item.name}) do @@ -129,19 +151,10 @@ defmodule BDS.ImportExecution do started_at ) - put_in(acc, [:tags, :created], acc.tags.created + 1) + {:ok, put_in(acc, [:tags, :created], acc.tags.created + 1)} - {:error, _reason} -> - notify_progress( - on_progress, - "tags", - index, - total, - "skipped_tag:#{item.name}", - started_at - ) - - put_in(acc, [:tags, :skipped], acc.tags.skipped + 1) + {:error, reason} -> + {:error, {:taxonomy_import_failed, item.name, reason}} end end end) @@ -161,8 +174,7 @@ defmodule BDS.ImportExecution do total = length(items) phase = Atom.to_string(bucket) - Enum.with_index(items, 1) - |> Enum.reduce(result, fn {item, index}, acc -> + transaction_reduce(items, result, fn item, index, acc -> notify_progress(on_progress, phase, index, total, "processing:#{item.title}", started_at) execute_post_item( @@ -188,9 +200,7 @@ defmodule BDS.ImportExecution do ) do total = length(items) - items - |> Enum.with_index(1) - |> Enum.reduce(result, fn {item, index}, acc -> + transaction_reduce(items, result, fn item, index, acc -> notify_progress( on_progress, "media", @@ -202,24 +212,21 @@ defmodule BDS.ImportExecution do cond do item.status == "missing" -> - put_in(acc, [:media, :skipped], acc.media.skipped + 1) + {:ok, put_in(acc, [:media, :skipped], acc.media.skipped + 1)} item.status in ["update", "content-duplicate", "duplicate"] -> - put_in(acc, [:media, :skipped], acc.media.skipped + 1) + {:ok, put_in(acc, [:media, :skipped], acc.media.skipped + 1)} item.status == "conflict" and resolve_conflict(item) == "ignore" -> - put_in(acc, [:media, :skipped], acc.media.skipped + 1) + {:ok, put_in(acc, [:media, :skipped], acc.media.skipped + 1)} true -> case import_media_item(project_id, item, default_author, uploads_folder_path, acc) do {:ok, _media} -> - put_in(acc, [:media, :imported], acc.media.imported + 1) + {:ok, put_in(acc, [:media, :imported], acc.media.imported + 1)} {:error, reason} -> - acc - |> put_in([:media, :errors], acc.media.errors + 1) - |> Map.update!(:errors, &(&1 ++ [inspect(reason)])) - |> Map.put(:success, false) + {:error, {:media_import_failed, item.filename, reason}} end end end) @@ -236,37 +243,33 @@ defmodule BDS.ImportExecution do ) do cond do item.status in ["update", "content-duplicate", "duplicate"] -> - put_in(result, [bucket, :skipped], get_in(result, [bucket, :skipped]) + 1) + {:ok, put_in(result, [bucket, :skipped], get_in(result, [bucket, :skipped]) + 1)} item.status == "conflict" and resolve_conflict(item) == "ignore" -> - put_in(result, [bucket, :skipped], get_in(result, [bucket, :skipped]) + 1) + {:ok, put_in(result, [bucket, :skipped], get_in(result, [bucket, :skipped]) + 1)} item.status == "conflict" and resolve_conflict(item) == "overwrite" -> case overwrite_post_item(item, default_author, tag_mapping, category_mapping) do {:ok, post} -> - result - |> put_in([bucket, :imported], get_in(result, [bucket, :imported]) + 1) - |> track_wp_id(item, post) + {:ok, + result + |> put_in([bucket, :imported], get_in(result, [bucket, :imported]) + 1) + |> track_wp_id(item, post)} {:error, reason} -> - result - |> put_in([bucket, :errors], get_in(result, [bucket, :errors]) + 1) - |> Map.update!(:errors, &(&1 ++ [inspect(reason)])) - |> Map.put(:success, false) + {:error, {:post_import_failed, bucket, post_item_label(item), reason}} end true -> case create_post_item(project_id, item, default_author, tag_mapping, category_mapping) do {:ok, post} -> - result - |> put_in([bucket, :imported], get_in(result, [bucket, :imported]) + 1) - |> track_wp_id(item, post) + {:ok, + result + |> put_in([bucket, :imported], get_in(result, [bucket, :imported]) + 1) + |> track_wp_id(item, post)} {:error, reason} -> - result - |> put_in([bucket, :errors], get_in(result, [bucket, :errors]) + 1) - |> Map.update!(:errors, &(&1 ++ [inspect(reason)])) - |> Map.put(:success, false) + {:error, {:post_import_failed, bucket, post_item_label(item), reason}} end end end @@ -557,6 +560,57 @@ defmodule BDS.ImportExecution do defp parse_timestamp(_value), do: nil + defp transaction_reduce(items, initial_acc, reducer) do + items + |> Enum.chunk_every(@transaction_batch_size) + |> Enum.reduce_while({:ok, initial_acc, 0}, fn chunk, {:ok, acc, processed} -> + case run_repo_transaction(fn -> + Enum.with_index(chunk, 1) + |> Enum.reduce_while(acc, fn {item, index}, chunk_acc -> + case reducer.(item, processed + index, chunk_acc) do + {:ok, next_acc} -> {:cont, next_acc} + {:error, reason} -> Repo.rollback(reason) + end + end) + end) do + {:ok, next_acc} -> {:cont, {:ok, next_acc, processed + length(chunk)}} + {:error, reason} -> {:halt, {:error, reason}} + end + end) + |> case do + {:ok, acc, _processed} -> {:ok, acc} + {:error, reason} -> {:error, reason} + end + end + + defp run_repo_transaction(fun) when is_function(fun, 0) do + Repo.transaction(fun) + rescue + error -> {:error, error} + catch + kind, reason -> {:error, {kind, reason}} + end + + defp post_item_label(item) do + Map.get(item, :slug) || Map.get(item, :title) || inspect(Map.get(item, :wp_id)) + end + + defp format_import_error({:post_import_failed, bucket, label, reason}) do + "#{bucket} import failed for #{label}: #{inspect(reason)}" + end + + defp format_import_error({:media_import_failed, filename, reason}) do + "media import failed for #{filename}: #{inspect(reason)}" + end + + defp format_import_error({:taxonomy_import_failed, name, reason}) do + "tag import failed for #{name}: #{inspect(reason)}" + end + + defp format_import_error(%_{} = error), do: Exception.message(error) + defp format_import_error({kind, reason}), do: "#{kind}: #{inspect(reason)}" + defp format_import_error(reason), do: inspect(reason) + defp uploads_source_path(relative_path, uploads_folder_path) defp uploads_source_path(relative_path, uploads_folder_path) diff --git a/lib/bds/posts/rebuild_from_files.ex b/lib/bds/posts/rebuild_from_files.ex index 0519a0b..81a122b 100644 --- a/lib/bds/posts/rebuild_from_files.ex +++ b/lib/bds/posts/rebuild_from_files.ex @@ -15,6 +15,8 @@ defmodule BDS.Posts.RebuildFromFiles do alias BDS.Repo alias BDS.Search + @transaction_batch_size 500 + @spec rebuild_posts_from_files(String.t(), keyword()) :: {:ok, [Post.t()]} | {:error, term()} def rebuild_posts_from_files(project_id, opts \\ []) do project = Projects.get_project!(project_id) @@ -34,46 +36,29 @@ defmodule BDS.Posts.RebuildFromFiles do {translation_files, post_files} = Enum.split_with(rebuild_files, &TranslationValidation.translation_rebuild_file?/1) - posts = - post_files - |> Enum.with_index(1) - |> Enum.map(fn {file, index} -> - post = - upsert_post_from_rebuild_file(project_id, file, - sync_search: false, - sync_embeddings: false + operations = Enum.map(post_files, &{:post, &1}) ++ Enum.map(translation_files, &{:translation, &1}) + + with {:ok, posts} <- persist_rebuild_operations(project_id, operations, total_files, on_progress) do + if Keyword.get(opts, :reindex_search, true) do + :ok = report_rebuild_phase(on_progress, 0.97, "Refreshing post search index") + + :ok = + Search.reindex_posts(project_id, + on_progress: scaled_progress_reporter(on_progress, 0.97, 0.99) ) + end - :ok = report_rebuild_progress(on_progress, index, total_files, "post files") - post - end) + if Keyword.get(opts, :rebuild_embeddings, true) do + :ok = report_rebuild_phase(on_progress, 0.99, "Refreshing post embeddings") - translation_files - |> Enum.with_index(length(post_files) + 1) - |> Enum.each(fn {file, index} -> - upsert_post_translation_from_rebuild_file(project_id, file, sync_search: false) - :ok = report_rebuild_progress(on_progress, index, total_files, "post files") - end) + {:ok, _rebuilt_post_ids} = + Embeddings.rebuild_project(project_id, + on_progress: scaled_progress_reporter(on_progress, 0.99, 1.0) + ) + end - if Keyword.get(opts, :reindex_search, true) do - :ok = report_rebuild_phase(on_progress, 0.97, "Refreshing post search index") - - :ok = - Search.reindex_posts(project_id, - on_progress: scaled_progress_reporter(on_progress, 0.97, 0.99) - ) + {:ok, posts} end - - if Keyword.get(opts, :rebuild_embeddings, true) do - :ok = report_rebuild_phase(on_progress, 0.99, "Refreshing post embeddings") - - {:ok, _rebuilt_post_ids} = - Embeddings.rebuild_project(project_id, - on_progress: scaled_progress_reporter(on_progress, 0.99, 1.0) - ) - end - - {:ok, posts} end end @@ -314,4 +299,49 @@ defmodule BDS.Posts.RebuildFromFiles do {:error, reason} -> {:error, {:read_rebuild_file, path, reason}} end end + + defp persist_rebuild_operations(project_id, operations, total_files, on_progress) do + operations + |> Enum.chunk_every(@transaction_batch_size) + |> Enum.reduce_while({:ok, [], 0}, fn chunk, {:ok, posts, processed} -> + case run_repo_transaction(fn -> + Enum.map(chunk, fn + {:post, file} -> + {:post, + upsert_post_from_rebuild_file(project_id, file, + sync_search: false, + sync_embeddings: false + )} + + {:translation, file} -> + {:translation, + upsert_post_translation_from_rebuild_file(project_id, file, sync_search: false)} + end) + end) do + {:ok, committed} -> + Enum.with_index(committed, processed + 1) + |> Enum.each(fn {_entry, index} -> + :ok = report_rebuild_progress(on_progress, index, total_files, "post files") + end) + + chunk_posts = for {:post, post} <- committed, do: post + {:cont, {:ok, posts ++ chunk_posts, processed + length(chunk)}} + + {:error, reason} -> + {:halt, {:error, reason}} + end + end) + |> case do + {:ok, posts, _processed} -> {:ok, posts} + {:error, reason} -> {:error, reason} + end + end + + defp run_repo_transaction(fun) when is_function(fun, 0) do + Repo.transaction(fun) + rescue + error -> {:error, error} + catch + kind, reason -> {:error, {kind, reason}} + end end diff --git a/lib/bds/wxr_parser.ex b/lib/bds/wxr_parser.ex index 687d989..1d0052a 100644 --- a/lib/bds/wxr_parser.ex +++ b/lib/bds/wxr_parser.ex @@ -1,143 +1,362 @@ defmodule BDS.WxrParser do @moduledoc false - require Record + defmodule ParserState do + @moduledoc false - Record.defrecord(:xmlElement, Record.extract(:xmlElement, from_lib: "xmerl/include/xmerl.hrl")) - - Record.defrecord( - :xmlAttribute, - Record.extract(:xmlAttribute, from_lib: "xmerl/include/xmerl.hrl") - ) - - Record.defrecord(:xmlText, Record.extract(:xmlText, from_lib: "xmerl/include/xmerl.hrl")) - - def parse_file(file_path) when is_binary(file_path) do - file_path - |> File.read!() - |> parse_xml() + defstruct stack: [], + channel_seen?: false, + site: %{title: "", link: "", description: "", language: ""}, + categories: [], + tags: [], + items: [], + current_category: nil, + current_tag: nil, + current_item: nil, + current_taxonomy: nil, + text: "" end - def parse_xml(xml_content) when is_binary(xml_content) do - {document, _rest} = :xmerl_scan.string(String.to_charlist(xml_content)) + defmodule Handler do + @moduledoc false - case :xmerl_xpath.string(~c"/rss/channel", document) do - [channel] -> - %{ - site: parse_site(channel), - posts: parse_post_like_items(channel), - pages: parse_items(channel, "page"), - media: parse_media(channel), - categories: parse_categories(channel), - tags: parse_tags(channel) - } + @behaviour Saxy.Handler - _other -> - raise RuntimeError, "Invalid WXR file: no element found" + alias BDS.WxrParser.ParserState + + def handle_event(:start_document, _prolog, state), do: {:ok, state} + + def handle_event(:end_document, _data, state), do: {:ok, state} + + def handle_event(:characters, chars, state) do + {:ok, %{state | text: state.text <> chars}} + end + + def handle_event(:start_element, {name, attributes}, state) do + parent = current_name(state) + + state = + state + |> push_name(name) + |> reset_text() + |> maybe_start_channel(parent, name) + |> maybe_start_category(parent, name) + |> maybe_start_tag(parent, name) + |> maybe_start_item(parent, name) + |> maybe_start_item_taxonomy(parent, name, attributes) + + {:ok, state} + end + + def handle_event(:end_element, name, state) do + parent = parent_name(state) + text = String.trim(state.text) + + state = + state + |> maybe_capture_site_field(parent, name, text) + |> maybe_capture_category_field(parent, name, text) + |> maybe_finish_category(parent, name) + |> maybe_capture_tag_field(parent, name, text) + |> maybe_finish_tag(parent, name) + |> maybe_capture_item_field(parent, name, text) + |> maybe_finish_item_taxonomy(parent, name, text) + |> maybe_finish_item(parent, name) + |> pop_name() + |> reset_text() + + {:ok, state} + end + + defp current_name(%ParserState{stack: [name | _rest]}), do: name + defp current_name(%ParserState{}), do: nil + + defp parent_name(%ParserState{stack: [_current, parent | _rest]}), do: parent + defp parent_name(%ParserState{}), do: nil + + defp push_name(state, name), do: %{state | stack: [name | state.stack]} + + defp pop_name(%ParserState{stack: [_name | rest]} = state), do: %{state | stack: rest} + defp pop_name(state), do: state + + defp reset_text(state), do: %{state | text: ""} + + defp maybe_start_channel(state, "rss", "channel"), do: %{state | channel_seen?: true} + defp maybe_start_channel(state, _parent, _name), do: state + + defp maybe_start_category(state, "channel", "wp:category") do + %{state | current_category: %{name: "", slug: "", parent: ""}} + end + + defp maybe_start_category(state, _parent, _name), do: state + + defp maybe_start_tag(state, "channel", "wp:tag") do + %{state | current_tag: %{name: "", slug: ""}} + end + + defp maybe_start_tag(state, _parent, _name), do: state + + defp maybe_start_item(state, "channel", "item") do + %{state | current_item: empty_item()} + end + + defp maybe_start_item(state, _parent, _name), do: state + + defp maybe_start_item_taxonomy(state, "item", "category", attributes) do + %{state | current_taxonomy: %{domain: attribute_value(attributes, "domain")}} + end + + defp maybe_start_item_taxonomy(state, _parent, _name, _attributes), do: state + + defp maybe_capture_site_field( + %ParserState{current_category: nil, current_tag: nil, current_item: nil} = state, + "channel", + name, + text + ) do + case name do + "title" -> put_in(state.site.title, text) + "link" -> put_in(state.site.link, text) + "description" -> put_in(state.site.description, text) + "language" -> put_in(state.site.language, text) + _other -> state + end + end + + defp maybe_capture_site_field(state, _parent, _name, _text), do: state + + defp maybe_capture_category_field(%ParserState{current_category: nil} = state, _parent, _name, _text), + do: state + + defp maybe_capture_category_field(%ParserState{} = state, "wp:category", name, text) do + key = + case name do + "wp:cat_name" -> :name + "wp:category_nicename" -> :slug + "wp:category_parent" -> :parent + _other -> nil + end + + if key do + update_in(state.current_category, &Map.put(&1, key, text)) + else + state + end + end + + defp maybe_capture_category_field(state, _parent, _name, _text), do: state + + defp maybe_finish_category(%ParserState{current_category: nil} = state, _parent, _name), do: state + + defp maybe_finish_category(%ParserState{} = state, "channel", "wp:category") do + %{state | categories: [state.current_category | state.categories], current_category: nil} + end + + defp maybe_finish_category(state, _parent, _name), do: state + + defp maybe_capture_tag_field(%ParserState{current_tag: nil} = state, _parent, _name, _text), + do: state + + defp maybe_capture_tag_field(%ParserState{} = state, "wp:tag", name, text) do + key = + case name do + "wp:tag_name" -> :name + "wp:tag_slug" -> :slug + _other -> nil + end + + if key do + update_in(state.current_tag, &Map.put(&1, key, text)) + else + state + end + end + + defp maybe_capture_tag_field(state, _parent, _name, _text), do: state + + defp maybe_finish_tag(%ParserState{current_tag: nil} = state, _parent, _name), do: state + + defp maybe_finish_tag(%ParserState{} = state, "channel", "wp:tag") do + %{state | tags: [state.current_tag | state.tags], current_tag: nil} + end + + defp maybe_finish_tag(state, _parent, _name), do: state + + defp maybe_capture_item_field(%ParserState{current_item: nil} = state, _parent, _name, _text), + do: state + + defp maybe_capture_item_field(%ParserState{} = state, "item", name, text) do + key = + case name do + "title" -> :title + "pubDate" -> :pub_date + "dc:creator" -> :creator + "content:encoded" -> :content + "excerpt:encoded" -> :excerpt + "wp:post_id" -> :post_id + "wp:post_date" -> :post_date + "wp:post_modified" -> :post_modified + "wp:post_name" -> :post_name + "wp:status" -> :status + "wp:post_type" -> :post_type + "wp:post_parent" -> :post_parent + "wp:attachment_url" -> :attachment_url + _other -> nil + end + + if key do + update_in(state.current_item, &Map.put(&1, key, text)) + else + state + end + end + + defp maybe_capture_item_field(state, _parent, _name, _text), do: state + + defp maybe_finish_item_taxonomy( + %ParserState{current_item: nil, current_taxonomy: nil} = state, + _parent, + _name, + _text + ), + do: state + + defp maybe_finish_item_taxonomy(%ParserState{current_taxonomy: nil} = state, _parent, _name, _text), + do: state + + defp maybe_finish_item_taxonomy(%ParserState{} = state, "item", "category", text) do + domain = Map.get(state.current_taxonomy, :domain) + + state = + cond do + text == "" -> state + domain == "category" -> update_in(state.current_item.categories, &(&1 ++ [text])) + domain == "post_tag" -> update_in(state.current_item.tags, &(&1 ++ [text])) + true -> state + end + + %{state | current_taxonomy: nil} + end + + defp maybe_finish_item_taxonomy(state, _parent, _name, _text), do: state + + defp maybe_finish_item(%ParserState{current_item: nil} = state, _parent, _name), do: state + + defp maybe_finish_item(%ParserState{} = state, "channel", "item") do + %{state | items: [state.current_item | state.items], current_item: nil} + end + + defp maybe_finish_item(state, _parent, _name), do: state + + defp empty_item do + %{ + post_id: "", + title: "", + post_name: "", + content: "", + excerpt: "", + pub_date: "", + post_date: "", + post_modified: "", + creator: "", + status: "", + post_type: "", + post_parent: "", + attachment_url: "", + categories: [], + tags: [] + } + end + + defp attribute_value(attributes, name) do + Enum.find_value(attributes, fn + {^name, value} -> value + _other -> nil + end) end end - defp parse_site(channel) do + def parse_file(file_path) when is_binary(file_path) do + file_path + |> File.stream!(32_768, []) + |> parse_document(&Saxy.parse_stream(&1, Handler, %ParserState{})) + end + + def parse_xml(xml_content) when is_binary(xml_content) do + xml_content + |> parse_document(&Saxy.parse_string(&1, Handler, %ParserState{})) + end + + defp parse_document(input, parser) do + case parser.(input) do + {:ok, %ParserState{channel_seen?: true} = state} -> + build_result(state) + + {:ok, %ParserState{channel_seen?: false}} -> + raise RuntimeError, "Invalid WXR file: no element found" + + {:error, error} -> + raise RuntimeError, Exception.message(error) + end + end + + defp build_result(%ParserState{} = state) do + items = Enum.reverse(state.items) + %{ - title: child_text(channel, "title"), - link: child_text(channel, "link"), - description: child_text(channel, "description"), - language: child_text(channel, "language") + site: state.site, + posts: + items + |> Enum.filter(fn item -> item.post_type not in ["", "attachment", "page"] end) + |> Enum.map(&parse_post_item/1), + pages: + items + |> Enum.filter(&(&1.post_type == "page")) + |> Enum.map(&parse_post_item/1), + media: + items + |> Enum.filter(&(&1.post_type == "attachment")) + |> Enum.map(&parse_media_item/1), + categories: Enum.reverse(state.categories), + tags: Enum.reverse(state.tags) } end - defp parse_categories(channel) do - channel - |> direct_children() - |> Enum.filter(&(full_name(&1) == "wp:category")) - |> Enum.map(fn element -> - %{ - name: child_text(element, "cat_name"), - slug: child_text(element, "category_nicename"), - parent: child_text(element, "category_parent") - } - end) - end - - defp parse_tags(channel) do - channel - |> direct_children() - |> Enum.filter(&(full_name(&1) == "wp:tag")) - |> Enum.map(fn element -> - %{ - name: child_text(element, "tag_name"), - slug: child_text(element, "tag_slug") - } - end) - end - - defp parse_items(channel, expected_type) do - channel - |> direct_children_named("item") - |> Enum.filter(&(child_text(&1, "post_type") == expected_type)) - |> Enum.map(&parse_post_item/1) - end - - defp parse_post_like_items(channel) do - channel - |> direct_children_named("item") - |> Enum.filter(fn item -> - type = child_text(item, "post_type") - type not in ["", "attachment", "page"] - end) - |> Enum.map(&parse_post_item/1) - end - - defp parse_media(channel) do - channel - |> direct_children_named("item") - |> Enum.filter(&(child_text(&1, "post_type") == "attachment")) - |> Enum.map(&parse_media_item/1) - end - defp parse_post_item(item) do %{ - wp_id: parse_integer(child_text(item, "post_id")), - title: child_text(item, "title"), - slug: child_text(item, "post_name"), - content: child_text_by_full_name(item, "content:encoded"), - excerpt: child_text_by_full_name(item, "excerpt:encoded"), - pub_date: blank_to_nil(child_text(item, "pubDate")), - post_date: blank_to_nil(child_text(item, "post_date")), - post_modified: blank_to_nil(child_text(item, "post_modified")), - creator: child_text_by_full_name(item, "dc:creator"), - status: child_text(item, "status"), - post_type: child_text(item, "post_type"), - categories: item_taxonomy(item, "category"), - tags: item_taxonomy(item, "post_tag") + wp_id: parse_integer(item.post_id), + title: item.title, + slug: item.post_name, + content: item.content, + excerpt: item.excerpt, + pub_date: blank_to_nil(item.pub_date), + post_date: blank_to_nil(item.post_date), + post_modified: blank_to_nil(item.post_modified), + creator: item.creator, + status: item.status, + post_type: item.post_type, + categories: Enum.reject(item.categories, &(&1 == "")), + tags: Enum.reject(item.tags, &(&1 == "")) } end defp parse_media_item(item) do - attachment_url = child_text(item, "attachment_url") + attachment_url = item.attachment_url filename = attachment_url |> Path.basename() |> blank_to_nil() || "" %{ - wp_id: parse_integer(child_text(item, "post_id")), - title: child_text(item, "title"), + wp_id: parse_integer(item.post_id), + title: item.title, url: attachment_url, filename: filename, relative_path: relative_upload_path(attachment_url), - pub_date: blank_to_nil(child_text(item, "pubDate")), - parent_id: parse_integer(child_text(item, "post_parent")), + pub_date: blank_to_nil(item.pub_date), + parent_id: parse_integer(item.post_parent), mime_type: MIME.from_path(filename), - description: child_text_by_full_name(item, "content:encoded") + description: item.content } end - defp item_taxonomy(item, domain) do - item - |> direct_children_named("category") - |> Enum.filter(&(xml_attr(&1, :domain) == domain)) - |> Enum.map(&text_content/1) - |> Enum.reject(&(&1 == "")) - end - defp relative_upload_path(url) when is_binary(url) do marker = "/wp-content/uploads/" @@ -147,69 +366,6 @@ defmodule BDS.WxrParser do end end - defp direct_children(element) do - Enum.filter(xmlElement(element, :content), fn child -> - is_tuple(child) and tuple_size(child) > 0 and elem(child, 0) == :xmlElement - end) - end - - defp direct_children_named(element, name) do - Enum.filter(direct_children(element), &(local_name(&1) == name)) - end - - defp child_text(element, name) do - element - |> direct_children_named(name) - |> List.first() - |> text_content() - end - - defp child_text_by_full_name(element, name) do - element - |> direct_children() - |> Enum.find(&(full_name(&1) == name)) - |> text_content() - end - - defp text_content(nil), do: "" - - defp text_content(element) do - element - |> xmlElement(:content) - |> Enum.map_join("", fn - child when is_tuple(child) and tuple_size(child) > 0 and elem(child, 0) == :xmlText -> - child - |> xmlText(:value) - |> to_string() - - child when is_tuple(child) and tuple_size(child) > 0 and elem(child, 0) == :xmlElement -> - text_content(child) - - _other -> - "" - end) - |> String.trim() - end - - defp xml_attr(element, name) do - element - |> xmlElement(:attributes) - |> Enum.find_value(fn attribute -> - if xmlAttribute(attribute, :name) == name do - attribute |> xmlAttribute(:value) |> to_string() - end - end) - end - - defp full_name(element), do: element |> xmlElement(:name) |> to_string() - - defp local_name(element) do - element - |> full_name() - |> String.split(":") - |> List.last() - end - defp parse_integer(value) do case Integer.parse(to_string(value)) do {parsed, _rest} -> parsed diff --git a/mix.exs b/mix.exs index dc0bd35..dffce71 100644 --- a/mix.exs +++ b/mix.exs @@ -33,6 +33,7 @@ defmodule BDS.MixProject do {:plug, "~> 1.18"}, {:bandit, "~> 1.5"}, {:req, "~> 0.5"}, + {:saxy, "~> 1.4"}, {:desktop, "~> 1.5"}, {:image, "~> 0.67"}, {:nx, "~> 0.10"}, diff --git a/test/bds/csm005_sql_filtering_test.exs b/test/bds/csm005_sql_filtering_test.exs index 3e05dfe..7fe5a81 100644 --- a/test/bds/csm005_sql_filtering_test.exs +++ b/test/bds/csm005_sql_filtering_test.exs @@ -331,7 +331,7 @@ defmodule BDS.CSM005SQLFilteringTest do # Helpers # --------------------------------------------------------------------------- - defp create_post(project_id, opts \\ []) do + defp create_post(project_id, opts) do title = Keyword.get(opts, :title, "Post #{System.unique_integer([:positive])}") status = Keyword.get(opts, :status, :draft) tags = Keyword.get(opts, :tags, []) diff --git a/test/bds/import_execution_test.exs b/test/bds/import_execution_test.exs index 5fc46be..ebcae80 100644 --- a/test/bds/import_execution_test.exs +++ b/test/bds/import_execution_test.exs @@ -187,6 +187,66 @@ defmodule BDS.ImportExecutionTest do assert_received {:execution_progress, "complete", 1, 1, %{detail: "import_complete"}} end + test "execute_import rolls back imported posts when a later post write fails", %{ + project: project + } do + report = %{ + items: %{ + categories: [], + tags: [], + posts: [ + %{ + item_type: "post", + post_type: "post", + wp_id: 101, + title: "Committed Too Early", + slug: "committed-too-early", + status: "new", + author: nil, + excerpt: nil, + categories: [], + tags: [], + wp_status: "publish", + content_markdown: "first body", + content_checksum: sha256("first body"), + created_at: "2024-05-01 12:00:00", + updated_at: "2024-05-01 12:30:00", + published_at: "2024-05-01 12:00:00" + }, + %{ + item_type: "post", + post_type: "post", + wp_id: 102, + title: "Broken Overwrite", + slug: "broken-overwrite", + status: "conflict", + resolution: "overwrite", + existing_id: "missing-post-id", + author: nil, + excerpt: nil, + categories: [], + tags: [], + wp_status: "publish", + content_markdown: "second body", + content_checksum: sha256("second body"), + created_at: "2024-05-02 12:00:00", + updated_at: "2024-05-02 12:30:00", + published_at: "2024-05-02 12:00:00" + } + ], + pages: [], + media: [] + }, + details: %{posts: [], pages: [], media: []} + } + + assert {:error, %{message: message}} = + ImportExecution.execute_import(project.id, report, default_author: "Imported Author") + + assert message =~ "not_found" + assert Repo.aggregate(Posts.Post, :count, :id) == 0 + end + defp sha256(value) do :sha256 |> :crypto.hash(value) diff --git a/test/bds/posts_test.exs b/test/bds/posts_test.exs index 5cf9745..8c9420c 100644 --- a/test/bds/posts_test.exs +++ b/test/bds/posts_test.exs @@ -802,6 +802,50 @@ defmodule BDS.PostsTest do BDS.Posts.rebuild_posts_from_files(project.id) end + test "rebuild_posts_from_files rolls back created posts when a later translation import fails", %{ + project: project + } do + posts_dir = Path.join([BDS.Projects.project_data_dir(project), "posts", "2026", "04"]) + File.mkdir_p!(posts_dir) + + File.write!( + Path.join(posts_dir, "chimera.md"), + [ + "---", + "id: canonical-post-id", + "title: Chimera Source", + "slug: chimera", + "status: published", + "language: de", + "createdAt: 2024-03-30T21:20:00.000Z", + "updatedAt: 2024-03-31T21:20:00.000Z", + "publishedAt: 2024-04-01T21:20:00.000Z", + "---", + "Quelle", + "" + ] + |> Enum.join("\n") + ) + + File.write!( + Path.join(posts_dir, "chimera.en.md"), + [ + "---", + "id: broken-translation-id", + "translationFor: missing-source-post-id", + "language: en", + "title: Broken Translation", + "---", + "Translated body", + "" + ] + |> Enum.join("\n") + ) + + assert {:error, _reason} = BDS.Posts.rebuild_posts_from_files(project.id) + assert BDS.Repo.aggregate(BDS.Posts.Post, :count, :id) == 0 + end + test "rebuild_posts_from_files parses folded multiline title and slug scalars alongside translations" do temp_dir = Path.join( diff --git a/test/bds/wxr_parser_test.exs b/test/bds/wxr_parser_test.exs index e3d5348..c85ba08 100644 --- a/test/bds/wxr_parser_test.exs +++ b/test/bds/wxr_parser_test.exs @@ -53,6 +53,31 @@ defmodule BDS.WxrParserTest do end end + test "parse_xml does not intern unknown element names as atoms" do + unique_name = "csm036_untrusted_#{System.unique_integer([:positive])}" + + xml = """ + + + + Legacy Blog + <#{unique_name}>ignored payload + + + """ + + parsed = WxrParser.parse_xml(xml) + assert parsed.site.title == "Legacy Blog" + + assert_raise ArgumentError, fn -> + String.to_existing_atom(unique_name) + end + end + defp sample_wxr_xml do """