Files
bDS2/lib/bds/tasks.ex

375 lines
10 KiB
Elixir

defmodule BDS.Tasks do
@moduledoc false
use GenServer
@default_max_concurrent 3
@default_progress_throttle_ms 250
def start_link(_opts) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def submit_task(name, work, attrs \\ %{})
when is_binary(name) and is_function(work, 1) and is_map(attrs) do
GenServer.call(__MODULE__, {:submit_task, name, work, attrs})
end
def get_task(task_id) when is_binary(task_id) do
GenServer.call(__MODULE__, {:get_task, task_id})
end
def status_snapshot do
GenServer.call(__MODULE__, :status_snapshot)
end
def cancel_task(task_id) when is_binary(task_id) do
GenServer.call(__MODULE__, {:cancel_task, task_id})
end
def register_external_task(name, attrs \\ %{}) when is_binary(name) and is_map(attrs) do
GenServer.call(__MODULE__, {:register_external_task, name, attrs})
end
def report_progress(task_id, value, message) when is_binary(task_id) do
GenServer.call(__MODULE__, {:report_progress, task_id, value, message})
end
def complete_task(task_id) when is_binary(task_id) do
GenServer.call(__MODULE__, {:complete_task, task_id})
end
def fail_task(task_id, error_message) when is_binary(task_id) do
GenServer.call(__MODULE__, {:fail_task, task_id, error_message})
end
@impl true
def init(_state) do
{:ok,
%{
tasks: %{},
queue: [],
running: %{},
ref_to_task: %{}
}}
end
@impl true
def handle_call({:submit_task, name, work, attrs}, _from, state) do
task = new_task(name, :pending, attrs)
next_state = put_in(state, [:tasks, task.id], task)
if map_size(next_state.running) < max_concurrent() do
{:reply, {:ok, public_task(task)}, start_task(next_state, task.id, work)}
else
{:reply, {:ok, public_task(task)},
%{next_state | queue: next_state.queue ++ [{task.id, work}]}}
end
end
def handle_call({:get_task, task_id}, _from, state) do
{:reply, state.tasks[task_id] && public_task(state.tasks[task_id]), state}
end
def handle_call(:status_snapshot, _from, state) do
{:reply, build_status_snapshot(state), state}
end
def handle_call({:cancel_task, task_id}, _from, state) do
cond do
Map.has_key?(state.running, task_id) ->
%{pid: pid, ref: ref} = state.running[task_id]
Process.exit(pid, :kill)
next_state =
state
|> update_task(task_id, %{status: :cancelled, finished_at: DateTime.utc_now()})
|> remove_running(task_id, ref)
|> start_queued_tasks()
{:reply, :ok, next_state}
Enum.any?(state.queue, fn {queued_id, _work} -> queued_id == task_id end) ->
next_state =
state
|> update_task(task_id, %{status: :cancelled, finished_at: DateTime.utc_now()})
|> Map.update!(:queue, fn queue ->
Enum.reject(queue, fn {queued_id, _work} -> queued_id == task_id end)
end)
|> start_queued_tasks()
{:reply, :ok, next_state}
state.tasks[task_id] == nil ->
{:reply, {:error, :not_found}, state}
true ->
{:reply, {:error, :not_running}, state}
end
end
def handle_call({:register_external_task, name, attrs}, _from, state) do
task = new_task(name, :running, attrs) |> Map.put(:started_at, DateTime.utc_now())
next_state = put_in(state, [:tasks, task.id], task)
{:reply, {:ok, public_task(task)}, next_state}
end
def handle_call({:report_progress, task_id, value, message}, _from, state) do
{:reply, :ok, maybe_report_progress(state, task_id, value, message)}
end
def handle_call({:complete_task, task_id}, _from, state) do
next_state =
state
|> update_task(task_id, %{
status: :completed,
progress: 1.0,
finished_at: DateTime.utc_now()
})
|> start_queued_tasks()
{:reply, :ok, next_state}
end
def handle_call({:fail_task, task_id, error_message}, _from, state) do
next_state =
state
|> update_task(task_id, %{
status: :failed,
message: error_message,
finished_at: DateTime.utc_now()
})
|> start_queued_tasks()
{:reply, :ok, next_state}
end
@impl true
def handle_info({:task_progress, task_id, value, message}, state) do
{:noreply, maybe_report_progress(state, task_id, value, message)}
end
def handle_info({ref, result}, state) do
case state.ref_to_task[ref] do
nil ->
{:noreply, state}
task_id ->
Process.demonitor(ref, [:flush])
task = state.tasks[task_id]
next_state =
case task.status do
:cancelled ->
state
_status ->
attrs =
case normalize_result(result) do
{:ok, value} ->
%{
status: :completed,
result: value,
progress: 1.0,
finished_at: DateTime.utc_now()
}
{:error, reason} ->
%{status: :failed, error: reason, finished_at: DateTime.utc_now()}
end
update_task(state, task_id, attrs)
end
|> remove_running(task_id, ref)
|> start_queued_tasks()
{:noreply, next_state}
end
end
def handle_info({:DOWN, ref, :process, _pid, reason}, state) do
case state.ref_to_task[ref] do
nil ->
{:noreply, state}
task_id ->
task = state.tasks[task_id]
next_state =
cond do
task.status == :cancelled ->
state
reason == :normal ->
state
true ->
update_task(state, task_id, %{
status: :failed,
error: reason,
finished_at: DateTime.utc_now()
})
end
|> remove_running(task_id, ref)
|> start_queued_tasks()
{:noreply, next_state}
end
end
defp start_task(state, task_id, work) do
reporter = fn value, message ->
send(__MODULE__, {:task_progress, task_id, value, message})
end
task =
Task.Supervisor.async_nolink(BDS.Tasks.TaskSupervisor, fn ->
work.(reporter)
end)
state
|> update_task(task_id, %{status: :running, started_at: DateTime.utc_now()})
|> put_in([:running, task_id], %{pid: task.pid, ref: task.ref})
|> put_in([:ref_to_task, task.ref], task_id)
end
defp start_queued_tasks(state) do
cond do
map_size(state.running) >= max_concurrent() ->
state
state.queue == [] ->
state
true ->
[{task_id, work} | remaining] = state.queue
state
|> Map.put(:queue, remaining)
|> start_task(task_id, work)
|> start_queued_tasks()
end
end
defp remove_running(state, task_id, ref) do
state
|> update_in([:running], &Map.delete(&1, task_id))
|> update_in([:ref_to_task], &Map.delete(&1, ref))
end
defp maybe_report_progress(state, task_id, value, message) do
case state.tasks[task_id] do
nil ->
state
task ->
now_ms = System.monotonic_time(:millisecond)
last_reported_at = Map.get(task, :last_reported_at)
if is_nil(last_reported_at) or now_ms - last_reported_at >= progress_throttle_ms() or
value == 1.0 do
update_task(state, task_id, %{
progress: value,
message: message,
last_reported_at: now_ms
})
else
state
end
end
end
defp new_task(name, status, attrs) do
%{
id: "task-" <> Integer.to_string(System.unique_integer([:positive, :monotonic])),
name: name,
status: status,
progress: nil,
message: nil,
group_id: attr(attrs, :group_id),
group_name: attr(attrs, :group_name),
created_at: DateTime.utc_now(),
started_at: nil,
finished_at: nil,
result: nil,
error: nil,
last_reported_at: nil
}
end
defp update_task(state, task_id, attrs) do
update_in(state, [:tasks, task_id], fn
nil -> nil
task -> Map.merge(task, attrs)
end)
end
defp public_task(nil), do: nil
defp public_task(task) do
Map.drop(task, [:last_reported_at])
end
defp build_status_snapshot(state) do
tasks = active_tasks(state)
%{
active_count: length(tasks),
running_count: Enum.count(tasks, &(&1.status == :running)),
pending_count: Enum.count(tasks, &(&1.status == :pending)),
running_task_message: running_task_message(tasks),
running_task_overflow: running_task_overflow(tasks),
tasks: Enum.map(tasks, &public_task/1)
}
end
defp active_tasks(state) do
state.tasks
|> Map.values()
|> Enum.filter(&(&1.status in [:running, :pending]))
|> Enum.sort_by(&task_sort_key/1)
end
defp task_sort_key(task) do
{task_priority(task.status), task.started_at || task.created_at}
end
defp task_priority(:running), do: 0
defp task_priority(:pending), do: 1
defp running_task_message([]), do: nil
defp running_task_message([task | _rest]) do
cond do
task.status == :pending -> "Queued: #{task.name}"
is_binary(task.message) and task.message != "" -> "#{task.name}: #{task.message}"
true -> task.name
end
end
defp running_task_overflow([]), do: 0
defp running_task_overflow(tasks), do: max(length(tasks) - 1, 0)
defp normalize_result({:ok, _value} = result), do: result
defp normalize_result({:error, _reason} = result), do: result
defp normalize_result(value), do: {:ok, value}
defp max_concurrent do
Application.get_env(:bds, :tasks, [])
|> Keyword.get(:max_concurrent, @default_max_concurrent)
end
defp progress_throttle_ms do
Application.get_env(:bds, :tasks, [])
|> Keyword.get(:progress_throttle_ms, @default_progress_throttle_ms)
end
defp attr(attrs, key) do
cond do
Map.has_key?(attrs, key) -> Map.get(attrs, key)
Map.has_key?(attrs, Atom.to_string(key)) -> Map.get(attrs, Atom.to_string(key))
true -> nil
end
end
end