feat: more complete metadata diff, scp publishing and rendering context
This commit is contained in:
@@ -10,7 +10,8 @@ defmodule BDS.Tasks do
|
||||
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
|
||||
end
|
||||
|
||||
def submit_task(name, work, attrs \\ %{}) when is_binary(name) and is_function(work, 1) and is_map(attrs) do
|
||||
def submit_task(name, work, attrs \\ %{})
|
||||
when is_binary(name) and is_function(work, 1) and is_map(attrs) do
|
||||
GenServer.call(__MODULE__, {:submit_task, name, work, attrs})
|
||||
end
|
||||
|
||||
@@ -57,7 +58,8 @@ defmodule BDS.Tasks do
|
||||
if map_size(next_state.running) < max_concurrent() do
|
||||
{:reply, {:ok, public_task(task)}, start_task(next_state, task.id, work)}
|
||||
else
|
||||
{:reply, {:ok, public_task(task)}, %{next_state | queue: next_state.queue ++ [{task.id, work}]}}
|
||||
{:reply, {:ok, public_task(task)},
|
||||
%{next_state | queue: next_state.queue ++ [{task.id, work}]}}
|
||||
end
|
||||
end
|
||||
|
||||
@@ -83,7 +85,9 @@ defmodule BDS.Tasks do
|
||||
next_state =
|
||||
state
|
||||
|> update_task(task_id, %{status: :cancelled, finished_at: DateTime.utc_now()})
|
||||
|> Map.update!(:queue, fn queue -> Enum.reject(queue, fn {queued_id, _work} -> queued_id == task_id end) end)
|
||||
|> Map.update!(:queue, fn queue ->
|
||||
Enum.reject(queue, fn {queued_id, _work} -> queued_id == task_id end)
|
||||
end)
|
||||
|> start_queued_tasks()
|
||||
|
||||
{:reply, :ok, next_state}
|
||||
@@ -109,7 +113,11 @@ defmodule BDS.Tasks do
|
||||
def handle_call({:complete_task, task_id}, _from, state) do
|
||||
next_state =
|
||||
state
|
||||
|> update_task(task_id, %{status: :completed, progress: 1.0, finished_at: DateTime.utc_now()})
|
||||
|> update_task(task_id, %{
|
||||
status: :completed,
|
||||
progress: 1.0,
|
||||
finished_at: DateTime.utc_now()
|
||||
})
|
||||
|> start_queued_tasks()
|
||||
|
||||
{:reply, :ok, next_state}
|
||||
@@ -118,7 +126,11 @@ defmodule BDS.Tasks do
|
||||
def handle_call({:fail_task, task_id, error_message}, _from, state) do
|
||||
next_state =
|
||||
state
|
||||
|> update_task(task_id, %{status: :failed, message: error_message, finished_at: DateTime.utc_now()})
|
||||
|> update_task(task_id, %{
|
||||
status: :failed,
|
||||
message: error_message,
|
||||
finished_at: DateTime.utc_now()
|
||||
})
|
||||
|> start_queued_tasks()
|
||||
|
||||
{:reply, :ok, next_state}
|
||||
@@ -146,8 +158,16 @@ defmodule BDS.Tasks do
|
||||
_status ->
|
||||
attrs =
|
||||
case normalize_result(result) do
|
||||
{:ok, value} -> %{status: :completed, result: value, progress: 1.0, finished_at: DateTime.utc_now()}
|
||||
{:error, reason} -> %{status: :failed, error: reason, finished_at: DateTime.utc_now()}
|
||||
{:ok, value} ->
|
||||
%{
|
||||
status: :completed,
|
||||
result: value,
|
||||
progress: 1.0,
|
||||
finished_at: DateTime.utc_now()
|
||||
}
|
||||
|
||||
{:error, reason} ->
|
||||
%{status: :failed, error: reason, finished_at: DateTime.utc_now()}
|
||||
end
|
||||
|
||||
update_task(state, task_id, attrs)
|
||||
@@ -176,7 +196,11 @@ defmodule BDS.Tasks do
|
||||
state
|
||||
|
||||
true ->
|
||||
update_task(state, task_id, %{status: :failed, error: reason, finished_at: DateTime.utc_now()})
|
||||
update_task(state, task_id, %{
|
||||
status: :failed,
|
||||
error: reason,
|
||||
finished_at: DateTime.utc_now()
|
||||
})
|
||||
end
|
||||
|> remove_running(task_id, ref)
|
||||
|> start_queued_tasks()
|
||||
@@ -186,7 +210,9 @@ defmodule BDS.Tasks do
|
||||
end
|
||||
|
||||
defp start_task(state, task_id, work) do
|
||||
reporter = fn value, message -> send(__MODULE__, {:task_progress, task_id, value, message}) end
|
||||
reporter = fn value, message ->
|
||||
send(__MODULE__, {:task_progress, task_id, value, message})
|
||||
end
|
||||
|
||||
task =
|
||||
Task.Supervisor.async_nolink(BDS.Tasks.TaskSupervisor, fn ->
|
||||
@@ -209,6 +235,7 @@ defmodule BDS.Tasks do
|
||||
|
||||
true ->
|
||||
[{task_id, work} | remaining] = state.queue
|
||||
|
||||
state
|
||||
|> Map.put(:queue, remaining)
|
||||
|> start_task(task_id, work)
|
||||
@@ -231,8 +258,13 @@ defmodule BDS.Tasks do
|
||||
now_ms = System.monotonic_time(:millisecond)
|
||||
last_reported_at = Map.get(task, :last_reported_at)
|
||||
|
||||
if is_nil(last_reported_at) or now_ms - last_reported_at >= progress_throttle_ms() or value == 1.0 do
|
||||
update_task(state, task_id, %{progress: value, message: message, last_reported_at: now_ms})
|
||||
if is_nil(last_reported_at) or now_ms - last_reported_at >= progress_throttle_ms() or
|
||||
value == 1.0 do
|
||||
update_task(state, task_id, %{
|
||||
progress: value,
|
||||
message: message,
|
||||
last_reported_at: now_ms
|
||||
})
|
||||
else
|
||||
state
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user