fix: implement TD-05, replacement of XML parser
This commit is contained in:
@@ -9,6 +9,8 @@ defmodule BDS.ImportExecution do
|
||||
alias BDS.MapUtils
|
||||
alias BDS.Tags
|
||||
|
||||
@transaction_batch_size 500
|
||||
|
||||
def execute_import(project_id, report, opts \\ [])
|
||||
when is_binary(project_id) and is_map(report) do
|
||||
normalized_report = normalize_report(report)
|
||||
@@ -45,54 +47,76 @@ defmodule BDS.ImportExecution do
|
||||
|
||||
notify_progress(on_progress, "tags", 0, taxonomy_total, "creating_tags", started_at)
|
||||
|
||||
result =
|
||||
execute_taxonomies(category_items, tag_items, project_id, result, on_progress, started_at)
|
||||
notify_progress(on_progress, "tags", 0, taxonomy_total, "creating_tags", started_at)
|
||||
|
||||
notify_progress(on_progress, "posts", 0, length(post_items), "importing_posts", started_at)
|
||||
|
||||
result =
|
||||
execute_posts(
|
||||
post_items,
|
||||
project_id,
|
||||
default_author,
|
||||
tag_mapping,
|
||||
category_mapping,
|
||||
result,
|
||||
on_progress,
|
||||
:posts,
|
||||
started_at
|
||||
)
|
||||
|
||||
notify_progress(on_progress, "media", 0, length(media_items), "importing_media", started_at)
|
||||
|
||||
result =
|
||||
execute_media(
|
||||
media_items,
|
||||
project_id,
|
||||
default_author,
|
||||
result,
|
||||
on_progress,
|
||||
uploads_folder_path,
|
||||
started_at
|
||||
)
|
||||
|
||||
notify_progress(on_progress, "pages", 0, length(page_items), "importing_pages", started_at)
|
||||
|
||||
result =
|
||||
execute_posts(
|
||||
page_items,
|
||||
project_id,
|
||||
default_author,
|
||||
tag_mapping,
|
||||
category_mapping,
|
||||
result,
|
||||
on_progress,
|
||||
:pages,
|
||||
started_at
|
||||
)
|
||||
|
||||
notify_progress(on_progress, "complete", 1, 1, "import_complete", started_at)
|
||||
{:ok, result}
|
||||
with {:ok, after_taxonomies} <-
|
||||
execute_taxonomies(category_items, tag_items, project_id, result, on_progress, started_at),
|
||||
:ok <-
|
||||
notify_progress(
|
||||
on_progress,
|
||||
"posts",
|
||||
0,
|
||||
length(post_items),
|
||||
"importing_posts",
|
||||
started_at
|
||||
),
|
||||
{:ok, after_posts} <-
|
||||
execute_posts(
|
||||
post_items,
|
||||
project_id,
|
||||
default_author,
|
||||
tag_mapping,
|
||||
category_mapping,
|
||||
after_taxonomies,
|
||||
on_progress,
|
||||
:posts,
|
||||
started_at
|
||||
),
|
||||
:ok <-
|
||||
notify_progress(
|
||||
on_progress,
|
||||
"media",
|
||||
0,
|
||||
length(media_items),
|
||||
"importing_media",
|
||||
started_at
|
||||
),
|
||||
{:ok, after_media} <-
|
||||
execute_media(
|
||||
media_items,
|
||||
project_id,
|
||||
default_author,
|
||||
after_posts,
|
||||
on_progress,
|
||||
uploads_folder_path,
|
||||
started_at
|
||||
),
|
||||
:ok <-
|
||||
notify_progress(
|
||||
on_progress,
|
||||
"pages",
|
||||
0,
|
||||
length(page_items),
|
||||
"importing_pages",
|
||||
started_at
|
||||
),
|
||||
{:ok, final_result} <-
|
||||
execute_posts(
|
||||
page_items,
|
||||
project_id,
|
||||
default_author,
|
||||
tag_mapping,
|
||||
category_mapping,
|
||||
after_media,
|
||||
on_progress,
|
||||
:pages,
|
||||
started_at
|
||||
) do
|
||||
notify_progress(on_progress, "complete", 1, 1, "import_complete", started_at)
|
||||
{:ok, final_result}
|
||||
else
|
||||
{:error, reason} -> {:error, %{message: format_import_error(reason)}}
|
||||
end
|
||||
rescue
|
||||
error -> {:error, %{message: Exception.message(error)}}
|
||||
end
|
||||
@@ -101,9 +125,7 @@ defmodule BDS.ImportExecution do
|
||||
items = category_items ++ tag_items
|
||||
total = length(items)
|
||||
|
||||
items
|
||||
|> Enum.with_index(1)
|
||||
|> Enum.reduce(result, fn {item, index}, acc ->
|
||||
transaction_reduce(items, result, fn item, index, acc ->
|
||||
cond do
|
||||
Map.get(item, :exists_in_project) || not is_nil(Map.get(item, :mapped_to)) ->
|
||||
notify_progress(
|
||||
@@ -115,7 +137,7 @@ defmodule BDS.ImportExecution do
|
||||
started_at
|
||||
)
|
||||
|
||||
put_in(acc, [:tags, :skipped], acc.tags.skipped + 1)
|
||||
{:ok, put_in(acc, [:tags, :skipped], acc.tags.skipped + 1)}
|
||||
|
||||
true ->
|
||||
case Tags.create_tag(%{project_id: project_id, name: item.name}) do
|
||||
@@ -129,19 +151,10 @@ defmodule BDS.ImportExecution do
|
||||
started_at
|
||||
)
|
||||
|
||||
put_in(acc, [:tags, :created], acc.tags.created + 1)
|
||||
{:ok, put_in(acc, [:tags, :created], acc.tags.created + 1)}
|
||||
|
||||
{:error, _reason} ->
|
||||
notify_progress(
|
||||
on_progress,
|
||||
"tags",
|
||||
index,
|
||||
total,
|
||||
"skipped_tag:#{item.name}",
|
||||
started_at
|
||||
)
|
||||
|
||||
put_in(acc, [:tags, :skipped], acc.tags.skipped + 1)
|
||||
{:error, reason} ->
|
||||
{:error, {:taxonomy_import_failed, item.name, reason}}
|
||||
end
|
||||
end
|
||||
end)
|
||||
@@ -161,8 +174,7 @@ defmodule BDS.ImportExecution do
|
||||
total = length(items)
|
||||
phase = Atom.to_string(bucket)
|
||||
|
||||
Enum.with_index(items, 1)
|
||||
|> Enum.reduce(result, fn {item, index}, acc ->
|
||||
transaction_reduce(items, result, fn item, index, acc ->
|
||||
notify_progress(on_progress, phase, index, total, "processing:#{item.title}", started_at)
|
||||
|
||||
execute_post_item(
|
||||
@@ -188,9 +200,7 @@ defmodule BDS.ImportExecution do
|
||||
) do
|
||||
total = length(items)
|
||||
|
||||
items
|
||||
|> Enum.with_index(1)
|
||||
|> Enum.reduce(result, fn {item, index}, acc ->
|
||||
transaction_reduce(items, result, fn item, index, acc ->
|
||||
notify_progress(
|
||||
on_progress,
|
||||
"media",
|
||||
@@ -202,24 +212,21 @@ defmodule BDS.ImportExecution do
|
||||
|
||||
cond do
|
||||
item.status == "missing" ->
|
||||
put_in(acc, [:media, :skipped], acc.media.skipped + 1)
|
||||
{:ok, put_in(acc, [:media, :skipped], acc.media.skipped + 1)}
|
||||
|
||||
item.status in ["update", "content-duplicate", "duplicate"] ->
|
||||
put_in(acc, [:media, :skipped], acc.media.skipped + 1)
|
||||
{:ok, put_in(acc, [:media, :skipped], acc.media.skipped + 1)}
|
||||
|
||||
item.status == "conflict" and resolve_conflict(item) == "ignore" ->
|
||||
put_in(acc, [:media, :skipped], acc.media.skipped + 1)
|
||||
{:ok, put_in(acc, [:media, :skipped], acc.media.skipped + 1)}
|
||||
|
||||
true ->
|
||||
case import_media_item(project_id, item, default_author, uploads_folder_path, acc) do
|
||||
{:ok, _media} ->
|
||||
put_in(acc, [:media, :imported], acc.media.imported + 1)
|
||||
{:ok, put_in(acc, [:media, :imported], acc.media.imported + 1)}
|
||||
|
||||
{:error, reason} ->
|
||||
acc
|
||||
|> put_in([:media, :errors], acc.media.errors + 1)
|
||||
|> Map.update!(:errors, &(&1 ++ [inspect(reason)]))
|
||||
|> Map.put(:success, false)
|
||||
{:error, {:media_import_failed, item.filename, reason}}
|
||||
end
|
||||
end
|
||||
end)
|
||||
@@ -236,37 +243,33 @@ defmodule BDS.ImportExecution do
|
||||
) do
|
||||
cond do
|
||||
item.status in ["update", "content-duplicate", "duplicate"] ->
|
||||
put_in(result, [bucket, :skipped], get_in(result, [bucket, :skipped]) + 1)
|
||||
{:ok, put_in(result, [bucket, :skipped], get_in(result, [bucket, :skipped]) + 1)}
|
||||
|
||||
item.status == "conflict" and resolve_conflict(item) == "ignore" ->
|
||||
put_in(result, [bucket, :skipped], get_in(result, [bucket, :skipped]) + 1)
|
||||
{:ok, put_in(result, [bucket, :skipped], get_in(result, [bucket, :skipped]) + 1)}
|
||||
|
||||
item.status == "conflict" and resolve_conflict(item) == "overwrite" ->
|
||||
case overwrite_post_item(item, default_author, tag_mapping, category_mapping) do
|
||||
{:ok, post} ->
|
||||
result
|
||||
|> put_in([bucket, :imported], get_in(result, [bucket, :imported]) + 1)
|
||||
|> track_wp_id(item, post)
|
||||
{:ok,
|
||||
result
|
||||
|> put_in([bucket, :imported], get_in(result, [bucket, :imported]) + 1)
|
||||
|> track_wp_id(item, post)}
|
||||
|
||||
{:error, reason} ->
|
||||
result
|
||||
|> put_in([bucket, :errors], get_in(result, [bucket, :errors]) + 1)
|
||||
|> Map.update!(:errors, &(&1 ++ [inspect(reason)]))
|
||||
|> Map.put(:success, false)
|
||||
{:error, {:post_import_failed, bucket, post_item_label(item), reason}}
|
||||
end
|
||||
|
||||
true ->
|
||||
case create_post_item(project_id, item, default_author, tag_mapping, category_mapping) do
|
||||
{:ok, post} ->
|
||||
result
|
||||
|> put_in([bucket, :imported], get_in(result, [bucket, :imported]) + 1)
|
||||
|> track_wp_id(item, post)
|
||||
{:ok,
|
||||
result
|
||||
|> put_in([bucket, :imported], get_in(result, [bucket, :imported]) + 1)
|
||||
|> track_wp_id(item, post)}
|
||||
|
||||
{:error, reason} ->
|
||||
result
|
||||
|> put_in([bucket, :errors], get_in(result, [bucket, :errors]) + 1)
|
||||
|> Map.update!(:errors, &(&1 ++ [inspect(reason)]))
|
||||
|> Map.put(:success, false)
|
||||
{:error, {:post_import_failed, bucket, post_item_label(item), reason}}
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -557,6 +560,57 @@ defmodule BDS.ImportExecution do
|
||||
|
||||
defp parse_timestamp(_value), do: nil
|
||||
|
||||
defp transaction_reduce(items, initial_acc, reducer) do
|
||||
items
|
||||
|> Enum.chunk_every(@transaction_batch_size)
|
||||
|> Enum.reduce_while({:ok, initial_acc, 0}, fn chunk, {:ok, acc, processed} ->
|
||||
case run_repo_transaction(fn ->
|
||||
Enum.with_index(chunk, 1)
|
||||
|> Enum.reduce_while(acc, fn {item, index}, chunk_acc ->
|
||||
case reducer.(item, processed + index, chunk_acc) do
|
||||
{:ok, next_acc} -> {:cont, next_acc}
|
||||
{:error, reason} -> Repo.rollback(reason)
|
||||
end
|
||||
end)
|
||||
end) do
|
||||
{:ok, next_acc} -> {:cont, {:ok, next_acc, processed + length(chunk)}}
|
||||
{:error, reason} -> {:halt, {:error, reason}}
|
||||
end
|
||||
end)
|
||||
|> case do
|
||||
{:ok, acc, _processed} -> {:ok, acc}
|
||||
{:error, reason} -> {:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
defp run_repo_transaction(fun) when is_function(fun, 0) do
|
||||
Repo.transaction(fun)
|
||||
rescue
|
||||
error -> {:error, error}
|
||||
catch
|
||||
kind, reason -> {:error, {kind, reason}}
|
||||
end
|
||||
|
||||
defp post_item_label(item) do
|
||||
Map.get(item, :slug) || Map.get(item, :title) || inspect(Map.get(item, :wp_id))
|
||||
end
|
||||
|
||||
defp format_import_error({:post_import_failed, bucket, label, reason}) do
|
||||
"#{bucket} import failed for #{label}: #{inspect(reason)}"
|
||||
end
|
||||
|
||||
defp format_import_error({:media_import_failed, filename, reason}) do
|
||||
"media import failed for #{filename}: #{inspect(reason)}"
|
||||
end
|
||||
|
||||
defp format_import_error({:taxonomy_import_failed, name, reason}) do
|
||||
"tag import failed for #{name}: #{inspect(reason)}"
|
||||
end
|
||||
|
||||
defp format_import_error(%_{} = error), do: Exception.message(error)
|
||||
defp format_import_error({kind, reason}), do: "#{kind}: #{inspect(reason)}"
|
||||
defp format_import_error(reason), do: inspect(reason)
|
||||
|
||||
defp uploads_source_path(relative_path, uploads_folder_path)
|
||||
|
||||
defp uploads_source_path(relative_path, uploads_folder_path)
|
||||
|
||||
Reference in New Issue
Block a user