fix: A1-11 graceful preview shutdown drains inflight requests before stopping
This commit is contained in:
@@ -14,6 +14,10 @@ defmodule BDS.Preview do
|
||||
@host "127.0.0.1"
|
||||
@port 4123
|
||||
|
||||
# Max time to wait for inflight requests to finish during graceful shutdown
|
||||
# before remaining request tasks are forcibly terminated.
|
||||
@drain_timeout 5_000
|
||||
|
||||
def start_link(_opts) do
|
||||
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
|
||||
end
|
||||
@@ -56,7 +60,7 @@ defmodule BDS.Preview do
|
||||
|
||||
@impl true
|
||||
def init(_state) do
|
||||
{:ok, %{current: nil}}
|
||||
{:ok, %{current: nil, stopping: nil}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
@@ -78,15 +82,12 @@ defmodule BDS.Preview do
|
||||
{:reply, reply, next_state}
|
||||
end
|
||||
|
||||
def handle_call({:stop_preview, project_id}, _from, state) do
|
||||
next_state =
|
||||
if match?(%{project_id: ^project_id}, state.current) do
|
||||
stop_current_server(state)
|
||||
else
|
||||
state
|
||||
end
|
||||
|
||||
{:reply, :ok, next_state}
|
||||
def handle_call({:stop_preview, project_id}, from, state) do
|
||||
if match?(%{project_id: ^project_id}, state.current) do
|
||||
begin_graceful_stop(state, from)
|
||||
else
|
||||
{:reply, :ok, state}
|
||||
end
|
||||
end
|
||||
|
||||
def handle_call({:request, project_id, request_path, query_params}, _from, state) do
|
||||
@@ -141,6 +142,25 @@ defmodule BDS.Preview do
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:track_request, pid}, %{current: %{} = current} = state) when is_pid(pid) do
|
||||
ref = Process.monitor(pid)
|
||||
inflight = Map.put(current.inflight, ref, pid)
|
||||
{:noreply, %{state | current: %{current | inflight: inflight}}}
|
||||
end
|
||||
|
||||
def handle_cast({:track_request, _pid}, state), do: {:noreply, state}
|
||||
|
||||
@impl true
|
||||
def handle_info({:DOWN, ref, :process, _pid, _reason}, %{current: %{} = current} = state) do
|
||||
inflight = Map.delete(current.inflight, ref)
|
||||
state = %{state | current: %{current | inflight: inflight}}
|
||||
{:noreply, maybe_finalize_stop(state)}
|
||||
end
|
||||
|
||||
def handle_info(:drain_timeout, state) do
|
||||
{:noreply, force_finalize_stop(state)}
|
||||
end
|
||||
|
||||
def handle_info(_msg, state) do
|
||||
{:noreply, state}
|
||||
end
|
||||
@@ -287,9 +307,18 @@ defmodule BDS.Preview do
|
||||
defp accept_loop(listener, project_id) do
|
||||
case :gen_tcp.accept(listener) do
|
||||
{:ok, socket} ->
|
||||
Task.Supervisor.start_child(BDS.TCP.TaskSupervisor, fn ->
|
||||
serve_client(socket, project_id)
|
||||
end)
|
||||
case Task.Supervisor.start_child(BDS.TCP.TaskSupervisor, fn ->
|
||||
serve_client(socket, project_id)
|
||||
end) do
|
||||
{:ok, pid} ->
|
||||
# Hand the socket to the request task so an inflight request survives
|
||||
# the acceptor being shut down (it would otherwise close the socket).
|
||||
_ = :gen_tcp.controlling_process(socket, pid)
|
||||
GenServer.cast(__MODULE__, {:track_request, pid})
|
||||
|
||||
_other ->
|
||||
:ok
|
||||
end
|
||||
|
||||
accept_loop(listener, project_id)
|
||||
|
||||
@@ -412,14 +441,58 @@ defmodule BDS.Preview do
|
||||
end
|
||||
end
|
||||
|
||||
defp stop_current_server(%{current: %{listener: listener, acceptor_pid: acceptor_pid}} = state) do
|
||||
_ = :gen_tcp.close(listener)
|
||||
if is_pid(acceptor_pid), do: Process.exit(acceptor_pid, :normal)
|
||||
# Graceful shutdown: stop accepting new connections, then wait for inflight
|
||||
# request tasks to finish before reporting the server stopped. The stop call
|
||||
# is parked (no immediate reply) and finalized from the :DOWN handlers, so the
|
||||
# GenServer stays available to serve the requests it is draining.
|
||||
defp begin_graceful_stop(%{current: current} = state, from) do
|
||||
_ = :gen_tcp.close(current.listener)
|
||||
if is_pid(current.acceptor_pid), do: Process.exit(current.acceptor_pid, :normal)
|
||||
|
||||
if map_size(current.inflight) == 0 do
|
||||
{:reply, :ok, %{state | current: nil, stopping: nil}}
|
||||
else
|
||||
timer = Process.send_after(self(), :drain_timeout, @drain_timeout)
|
||||
{:noreply, %{state | stopping: %{from: from, timer: timer}}}
|
||||
end
|
||||
end
|
||||
|
||||
defp maybe_finalize_stop(
|
||||
%{stopping: %{from: from, timer: timer}, current: %{inflight: inflight}} = state
|
||||
)
|
||||
when map_size(inflight) == 0 do
|
||||
if is_reference(timer), do: Process.cancel_timer(timer)
|
||||
GenServer.reply(from, :ok)
|
||||
%{state | current: nil, stopping: nil}
|
||||
end
|
||||
|
||||
defp maybe_finalize_stop(state), do: state
|
||||
|
||||
defp force_finalize_stop(%{stopping: %{from: from}, current: %{inflight: inflight}} = state) do
|
||||
kill_inflight(inflight)
|
||||
GenServer.reply(from, :ok)
|
||||
%{state | current: nil, stopping: nil}
|
||||
end
|
||||
|
||||
defp force_finalize_stop(state), do: state
|
||||
|
||||
# Hard stop used when restarting the server in place (no graceful drain).
|
||||
defp stop_current_server(%{current: %{} = current} = state) do
|
||||
_ = :gen_tcp.close(current.listener)
|
||||
if is_pid(current.acceptor_pid), do: Process.exit(current.acceptor_pid, :normal)
|
||||
kill_inflight(current.inflight)
|
||||
%{state | current: nil}
|
||||
end
|
||||
|
||||
defp stop_current_server(state), do: state
|
||||
|
||||
defp kill_inflight(inflight) do
|
||||
Enum.each(inflight, fn {ref, pid} ->
|
||||
Process.demonitor(ref, [:flush])
|
||||
if is_pid(pid), do: Process.exit(pid, :kill)
|
||||
end)
|
||||
end
|
||||
|
||||
defp start_server(state, project_id, data_dir, owner_pid) do
|
||||
state = stop_current_server(state)
|
||||
maybe_allow_repo(owner_pid)
|
||||
@@ -442,7 +515,8 @@ defmodule BDS.Preview do
|
||||
port: @port,
|
||||
is_running: true,
|
||||
listener: listener,
|
||||
acceptor_pid: acceptor_pid
|
||||
acceptor_pid: acceptor_pid,
|
||||
inflight: %{}
|
||||
}
|
||||
|
||||
{{:ok, public_server(server)}, %{state | current: server}}
|
||||
|
||||
Reference in New Issue
Block a user