defmodule BDS.Tasks do @moduledoc false use GenServer @default_max_concurrent 3 @default_progress_throttle_ms 250 def start_link(_opts) do GenServer.start_link(__MODULE__, %{}, name: __MODULE__) end def submit_task(name, work, attrs \\ %{}) when is_binary(name) and is_function(work, 1) and is_map(attrs) do GenServer.call(__MODULE__, {:submit_task, name, work, attrs}) end def get_task(task_id) when is_binary(task_id) do GenServer.call(__MODULE__, {:get_task, task_id}) end def cancel_task(task_id) when is_binary(task_id) do GenServer.call(__MODULE__, {:cancel_task, task_id}) end def register_external_task(name, attrs \\ %{}) when is_binary(name) and is_map(attrs) do GenServer.call(__MODULE__, {:register_external_task, name, attrs}) end def report_progress(task_id, value, message) when is_binary(task_id) do GenServer.call(__MODULE__, {:report_progress, task_id, value, message}) end def complete_task(task_id) when is_binary(task_id) do GenServer.call(__MODULE__, {:complete_task, task_id}) end def fail_task(task_id, error_message) when is_binary(task_id) do GenServer.call(__MODULE__, {:fail_task, task_id, error_message}) end @impl true def init(_state) do {:ok, %{ tasks: %{}, queue: [], running: %{}, ref_to_task: %{} }} end @impl true def handle_call({:submit_task, name, work, attrs}, _from, state) do task = new_task(name, :pending, attrs) next_state = put_in(state, [:tasks, task.id], task) if map_size(next_state.running) < max_concurrent() do {:reply, {:ok, public_task(task)}, start_task(next_state, task.id, work)} else {:reply, {:ok, public_task(task)}, %{next_state | queue: next_state.queue ++ [{task.id, work}]}} end end def handle_call({:get_task, task_id}, _from, state) do {:reply, state.tasks[task_id] && public_task(state.tasks[task_id]), state} end def handle_call({:cancel_task, task_id}, _from, state) do cond do Map.has_key?(state.running, task_id) -> %{pid: pid, ref: ref} = state.running[task_id] Process.exit(pid, :kill) next_state = state |> update_task(task_id, %{status: :cancelled, finished_at: DateTime.utc_now()}) |> remove_running(task_id, ref) |> start_queued_tasks() {:reply, :ok, next_state} Enum.any?(state.queue, fn {queued_id, _work} -> queued_id == task_id end) -> next_state = state |> update_task(task_id, %{status: :cancelled, finished_at: DateTime.utc_now()}) |> Map.update!(:queue, fn queue -> Enum.reject(queue, fn {queued_id, _work} -> queued_id == task_id end) end) |> start_queued_tasks() {:reply, :ok, next_state} state.tasks[task_id] == nil -> {:reply, {:error, :not_found}, state} true -> {:reply, {:error, :not_running}, state} end end def handle_call({:register_external_task, name, attrs}, _from, state) do task = new_task(name, :running, attrs) |> Map.put(:started_at, DateTime.utc_now()) next_state = put_in(state, [:tasks, task.id], task) {:reply, {:ok, public_task(task)}, next_state} end def handle_call({:report_progress, task_id, value, message}, _from, state) do {:reply, :ok, maybe_report_progress(state, task_id, value, message)} end def handle_call({:complete_task, task_id}, _from, state) do next_state = state |> update_task(task_id, %{status: :completed, progress: 1.0, finished_at: DateTime.utc_now()}) |> start_queued_tasks() {:reply, :ok, next_state} end def handle_call({:fail_task, task_id, error_message}, _from, state) do next_state = state |> update_task(task_id, %{status: :failed, message: error_message, finished_at: DateTime.utc_now()}) |> start_queued_tasks() {:reply, :ok, next_state} end @impl true def handle_info({:task_progress, task_id, value, message}, state) do {:noreply, maybe_report_progress(state, task_id, value, message)} end def handle_info({ref, result}, state) do case state.ref_to_task[ref] do nil -> {:noreply, state} task_id -> Process.demonitor(ref, [:flush]) task = state.tasks[task_id] next_state = case task.status do :cancelled -> state _status -> attrs = case normalize_result(result) do {:ok, value} -> %{status: :completed, result: value, progress: 1.0, finished_at: DateTime.utc_now()} {:error, reason} -> %{status: :failed, error: reason, finished_at: DateTime.utc_now()} end update_task(state, task_id, attrs) end |> remove_running(task_id, ref) |> start_queued_tasks() {:noreply, next_state} end end def handle_info({:DOWN, ref, :process, _pid, reason}, state) do case state.ref_to_task[ref] do nil -> {:noreply, state} task_id -> task = state.tasks[task_id] next_state = cond do task.status == :cancelled -> state reason == :normal -> state true -> update_task(state, task_id, %{status: :failed, error: reason, finished_at: DateTime.utc_now()}) end |> remove_running(task_id, ref) |> start_queued_tasks() {:noreply, next_state} end end defp start_task(state, task_id, work) do reporter = fn value, message -> send(__MODULE__, {:task_progress, task_id, value, message}) end task = Task.Supervisor.async_nolink(BDS.Tasks.TaskSupervisor, fn -> work.(reporter) end) state |> update_task(task_id, %{status: :running, started_at: DateTime.utc_now()}) |> put_in([:running, task_id], %{pid: task.pid, ref: task.ref}) |> put_in([:ref_to_task, task.ref], task_id) end defp start_queued_tasks(state) do cond do map_size(state.running) >= max_concurrent() -> state state.queue == [] -> state true -> [{task_id, work} | remaining] = state.queue state |> Map.put(:queue, remaining) |> start_task(task_id, work) |> start_queued_tasks() end end defp remove_running(state, task_id, ref) do state |> update_in([:running], &Map.delete(&1, task_id)) |> update_in([:ref_to_task], &Map.delete(&1, ref)) end defp maybe_report_progress(state, task_id, value, message) do case state.tasks[task_id] do nil -> state task -> now_ms = System.monotonic_time(:millisecond) last_reported_at = Map.get(task, :last_reported_at) if is_nil(last_reported_at) or now_ms - last_reported_at >= progress_throttle_ms() or value == 1.0 do update_task(state, task_id, %{progress: value, message: message, last_reported_at: now_ms}) else state end end end defp new_task(name, status, attrs) do %{ id: "task-" <> Integer.to_string(System.unique_integer([:positive, :monotonic])), name: name, status: status, progress: nil, message: nil, group_id: attr(attrs, :group_id), group_name: attr(attrs, :group_name), created_at: DateTime.utc_now(), started_at: nil, finished_at: nil, result: nil, error: nil, last_reported_at: nil } end defp update_task(state, task_id, attrs) do update_in(state, [:tasks, task_id], fn nil -> nil task -> Map.merge(task, attrs) end) end defp public_task(nil), do: nil defp public_task(task) do Map.drop(task, [:last_reported_at]) end defp normalize_result({:ok, _value} = result), do: result defp normalize_result({:error, _reason} = result), do: result defp normalize_result(value), do: {:ok, value} defp max_concurrent do Application.get_env(:bds, :tasks, []) |> Keyword.get(:max_concurrent, @default_max_concurrent) end defp progress_throttle_ms do Application.get_env(:bds, :tasks, []) |> Keyword.get(:progress_throttle_ms, @default_progress_throttle_ms) end defp attr(attrs, key) do cond do Map.has_key?(attrs, key) -> Map.get(attrs, key) Map.has_key?(attrs, Atom.to_string(key)) -> Map.get(attrs, Atom.to_string(key)) true -> nil end end end