feat: more clear definition and first base implementation for lua
Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
@@ -6,7 +6,10 @@ defmodule BDS.Application do
|
||||
@impl true
|
||||
def start(_type, _args) do
|
||||
children = [
|
||||
BDS.Repo
|
||||
BDS.Repo,
|
||||
BDS.Scripting.JobStore,
|
||||
{Task.Supervisor, name: BDS.Scripting.TaskSupervisor},
|
||||
BDS.Scripting.JobSupervisor
|
||||
]
|
||||
|
||||
opts = [strategy: :one_for_one, name: BDS.Supervisor]
|
||||
|
||||
108
lib/bds/scripting.ex
Normal file
108
lib/bds/scripting.ex
Normal file
@@ -0,0 +1,108 @@
|
||||
defmodule BDS.Scripting do
|
||||
@moduledoc """
|
||||
Facade for the configured user-script runtime.
|
||||
"""
|
||||
|
||||
alias BDS.Scripting.Runtime
|
||||
|
||||
@type job_status :: :queued | :running | :completed | :failed | :cancelled
|
||||
@type job_snapshot :: %{
|
||||
id: String.t(),
|
||||
status: job_status(),
|
||||
progress: map(),
|
||||
result: term() | nil,
|
||||
error: term() | nil,
|
||||
inserted_at: DateTime.t(),
|
||||
started_at: DateTime.t() | nil,
|
||||
finished_at: DateTime.t() | nil
|
||||
}
|
||||
|
||||
@spec runtime() :: module()
|
||||
def runtime do
|
||||
Application.fetch_env!(:bds, :scripting)
|
||||
|> Keyword.fetch!(:runtime)
|
||||
end
|
||||
|
||||
@spec validate(String.t()) :: :ok | {:error, term()}
|
||||
def validate(source) when is_binary(source) do
|
||||
runtime().validate(source)
|
||||
end
|
||||
|
||||
@spec execute(String.t(), String.t(), [term()], [Runtime.execution_option()]) ::
|
||||
{:ok, term()} | {:error, term()}
|
||||
def execute(source, entrypoint, args \\ [], opts \\ [])
|
||||
when is_binary(source) and is_binary(entrypoint) and is_list(args) and is_list(opts) do
|
||||
runtime().execute(source, entrypoint, args, opts)
|
||||
end
|
||||
|
||||
@spec start_job(String.t(), String.t(), [term()], [Runtime.execution_option()]) ::
|
||||
{:ok, job_snapshot()} | {:error, term()}
|
||||
def start_job(source, entrypoint, args \\ [], opts \\ [])
|
||||
when is_binary(source) and is_binary(entrypoint) and is_list(args) and is_list(opts) do
|
||||
job_id = "script-job-" <> Integer.to_string(System.unique_integer([:positive, :monotonic]))
|
||||
|
||||
job = %{
|
||||
id: job_id,
|
||||
status: :queued,
|
||||
progress: %{},
|
||||
result: nil,
|
||||
error: nil,
|
||||
inserted_at: DateTime.utc_now(),
|
||||
started_at: nil,
|
||||
finished_at: nil
|
||||
}
|
||||
|
||||
:ok = BDS.Scripting.JobStore.put_job(job)
|
||||
|
||||
child_spec =
|
||||
{BDS.Scripting.JobRunner,
|
||||
job_id: job_id,
|
||||
runtime: runtime(),
|
||||
source: source,
|
||||
entrypoint: entrypoint,
|
||||
args: args,
|
||||
opts: batch_job_defaults(opts)}
|
||||
|
||||
case DynamicSupervisor.start_child(BDS.Scripting.JobSupervisor, child_spec) do
|
||||
{:ok, _pid} -> {:ok, BDS.Scripting.JobStore.fetch_job!(job_id)}
|
||||
{:error, reason} ->
|
||||
:ok =
|
||||
BDS.Scripting.JobStore.update_job(job_id, %{
|
||||
status: :failed,
|
||||
error: reason,
|
||||
finished_at: DateTime.utc_now()
|
||||
})
|
||||
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
@spec get_job(String.t()) :: job_snapshot() | nil
|
||||
def get_job(job_id) when is_binary(job_id) do
|
||||
BDS.Scripting.JobStore.fetch_job(job_id)
|
||||
end
|
||||
|
||||
@spec cancel_job(String.t()) :: :ok | {:error, :not_found | :not_running}
|
||||
def cancel_job(job_id) when is_binary(job_id) do
|
||||
case BDS.Scripting.JobStore.runner_for(job_id) do
|
||||
nil ->
|
||||
case BDS.Scripting.JobStore.fetch_job(job_id) do
|
||||
nil -> {:error, :not_found}
|
||||
_job -> {:error, :not_running}
|
||||
end
|
||||
|
||||
pid -> BDS.Scripting.JobRunner.cancel(pid)
|
||||
end
|
||||
end
|
||||
|
||||
defp batch_job_defaults(opts) do
|
||||
config = Application.fetch_env!(:bds, :scripting)
|
||||
|
||||
defaults = [
|
||||
timeout: Keyword.get(config, :job_timeout, :infinity),
|
||||
max_reductions: Keyword.get(config, :job_max_reductions, :none)
|
||||
]
|
||||
|
||||
Keyword.merge(defaults, opts)
|
||||
end
|
||||
end
|
||||
121
lib/bds/scripting/job_runner.ex
Normal file
121
lib/bds/scripting/job_runner.ex
Normal file
@@ -0,0 +1,121 @@
|
||||
defmodule BDS.Scripting.JobRunner do
|
||||
@moduledoc false
|
||||
|
||||
use GenServer
|
||||
|
||||
def start_link(opts) do
|
||||
GenServer.start_link(__MODULE__, opts)
|
||||
end
|
||||
|
||||
def cancel(pid) when is_pid(pid) do
|
||||
GenServer.call(pid, :cancel)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(opts) do
|
||||
state = %{
|
||||
job_id: Keyword.fetch!(opts, :job_id),
|
||||
runtime: Keyword.fetch!(opts, :runtime),
|
||||
source: Keyword.fetch!(opts, :source),
|
||||
entrypoint: Keyword.fetch!(opts, :entrypoint),
|
||||
args: Keyword.get(opts, :args, []),
|
||||
opts: Keyword.get(opts, :opts, []),
|
||||
task_pid: nil,
|
||||
task_ref: nil,
|
||||
completed?: false,
|
||||
cancelled?: false
|
||||
}
|
||||
|
||||
Process.flag(:trap_exit, true)
|
||||
:ok = BDS.Scripting.JobStore.attach_runner(state.job_id, self())
|
||||
{:ok, state, {:continue, :start_job}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_continue(:start_job, state) do
|
||||
:ok =
|
||||
BDS.Scripting.JobStore.update_job(state.job_id, %{
|
||||
status: :running,
|
||||
started_at: DateTime.utc_now()
|
||||
})
|
||||
|
||||
runner = self()
|
||||
|
||||
task =
|
||||
Task.Supervisor.async_nolink(BDS.Scripting.TaskSupervisor, fn ->
|
||||
state.runtime.execute(
|
||||
state.source,
|
||||
state.entrypoint,
|
||||
state.args,
|
||||
Keyword.put(state.opts, :on_progress, fn event ->
|
||||
send(runner, {:job_progress, event})
|
||||
end)
|
||||
)
|
||||
end)
|
||||
|
||||
{:noreply, %{state | task_pid: task.pid, task_ref: task.ref}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call(:cancel, _from, state) do
|
||||
if is_pid(state.task_pid) do
|
||||
Process.exit(state.task_pid, :kill)
|
||||
end
|
||||
|
||||
:ok =
|
||||
BDS.Scripting.JobStore.update_job(state.job_id, %{
|
||||
status: :cancelled,
|
||||
finished_at: DateTime.utc_now()
|
||||
})
|
||||
|
||||
:ok = BDS.Scripting.JobStore.detach_runner(state.job_id)
|
||||
{:stop, :normal, :ok, %{state | cancelled?: true}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info({:job_progress, progress}, state) do
|
||||
:ok = BDS.Scripting.JobStore.update_job(state.job_id, %{progress: progress})
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_info({ref, result}, %{task_ref: ref} = state) do
|
||||
Process.demonitor(ref, [:flush])
|
||||
|
||||
unless state.cancelled? do
|
||||
attrs =
|
||||
case result do
|
||||
{:ok, value} ->
|
||||
%{status: :completed, result: value, finished_at: DateTime.utc_now()}
|
||||
|
||||
{:error, reason} ->
|
||||
%{status: :failed, error: reason, finished_at: DateTime.utc_now()}
|
||||
end
|
||||
|
||||
:ok = BDS.Scripting.JobStore.update_job(state.job_id, attrs)
|
||||
:ok = BDS.Scripting.JobStore.detach_runner(state.job_id)
|
||||
end
|
||||
|
||||
{:stop, :normal, %{state | completed?: true}}
|
||||
end
|
||||
|
||||
def handle_info({:DOWN, ref, :process, _pid, reason}, %{task_ref: ref} = state) do
|
||||
cond do
|
||||
state.completed? or state.cancelled? ->
|
||||
{:stop, :normal, state}
|
||||
|
||||
reason == :normal ->
|
||||
{:noreply, state}
|
||||
|
||||
true ->
|
||||
:ok =
|
||||
BDS.Scripting.JobStore.update_job(state.job_id, %{
|
||||
status: :failed,
|
||||
error: reason,
|
||||
finished_at: DateTime.utc_now()
|
||||
})
|
||||
|
||||
:ok = BDS.Scripting.JobStore.detach_runner(state.job_id)
|
||||
{:stop, :normal, state}
|
||||
end
|
||||
end
|
||||
end
|
||||
78
lib/bds/scripting/job_store.ex
Normal file
78
lib/bds/scripting/job_store.ex
Normal file
@@ -0,0 +1,78 @@
|
||||
defmodule BDS.Scripting.JobStore do
|
||||
@moduledoc false
|
||||
|
||||
use GenServer
|
||||
|
||||
def start_link(_opts) do
|
||||
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
|
||||
end
|
||||
|
||||
def put_job(job) when is_map(job) do
|
||||
GenServer.call(__MODULE__, {:put_job, job})
|
||||
end
|
||||
|
||||
def update_job(job_id, attrs) when is_binary(job_id) and is_map(attrs) do
|
||||
GenServer.call(__MODULE__, {:update_job, job_id, attrs})
|
||||
end
|
||||
|
||||
def attach_runner(job_id, pid) when is_binary(job_id) and is_pid(pid) do
|
||||
GenServer.call(__MODULE__, {:attach_runner, job_id, pid})
|
||||
end
|
||||
|
||||
def detach_runner(job_id) when is_binary(job_id) do
|
||||
GenServer.call(__MODULE__, {:detach_runner, job_id})
|
||||
end
|
||||
|
||||
def fetch_job(job_id) when is_binary(job_id) do
|
||||
GenServer.call(__MODULE__, {:fetch_job, job_id})
|
||||
end
|
||||
|
||||
def fetch_job!(job_id) when is_binary(job_id) do
|
||||
case fetch_job(job_id) do
|
||||
nil -> raise KeyError, key: job_id, term: :jobs
|
||||
job -> job
|
||||
end
|
||||
end
|
||||
|
||||
def runner_for(job_id) when is_binary(job_id) do
|
||||
GenServer.call(__MODULE__, {:runner_for, job_id})
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(_state) do
|
||||
{:ok, %{jobs: %{}, runners: %{}}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:put_job, %{id: job_id} = job}, _from, state) do
|
||||
next_state = put_in(state, [:jobs, job_id], job)
|
||||
{:reply, :ok, next_state}
|
||||
end
|
||||
|
||||
def handle_call({:update_job, job_id, attrs}, _from, state) do
|
||||
next_state = update_in(state, [:jobs, job_id], fn
|
||||
nil -> nil
|
||||
job -> Map.merge(job, attrs)
|
||||
end)
|
||||
|
||||
{:reply, :ok, next_state}
|
||||
end
|
||||
|
||||
def handle_call({:attach_runner, job_id, pid}, _from, state) do
|
||||
next_state = put_in(state, [:runners, job_id], pid)
|
||||
{:reply, :ok, next_state}
|
||||
end
|
||||
|
||||
def handle_call({:detach_runner, job_id}, _from, state) do
|
||||
next_state = update_in(state.runners, &Map.delete(&1, job_id))
|
||||
{:reply, :ok, %{state | runners: next_state}}
|
||||
end
|
||||
|
||||
def handle_call({:fetch_job, job_id}, _from, state) do
|
||||
{:reply, Map.get(state.jobs, job_id), state}
|
||||
end
|
||||
|
||||
def handle_call({:runner_for, job_id}, _from, state) do
|
||||
{:reply, Map.get(state.runners, job_id), state}
|
||||
end
|
||||
end
|
||||
14
lib/bds/scripting/job_supervisor.ex
Normal file
14
lib/bds/scripting/job_supervisor.ex
Normal file
@@ -0,0 +1,14 @@
|
||||
defmodule BDS.Scripting.JobSupervisor do
|
||||
@moduledoc false
|
||||
|
||||
use DynamicSupervisor
|
||||
|
||||
def start_link(_opts) do
|
||||
DynamicSupervisor.start_link(__MODULE__, :ok, name: __MODULE__)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(:ok) do
|
||||
DynamicSupervisor.init(strategy: :one_for_one)
|
||||
end
|
||||
end
|
||||
134
lib/bds/scripting/lua.ex
Normal file
134
lib/bds/scripting/lua.ex
Normal file
@@ -0,0 +1,134 @@
|
||||
defmodule BDS.Scripting.Lua do
|
||||
@moduledoc """
|
||||
Lua runtime adapter backed by Luerl.
|
||||
|
||||
Execution starts from a sandboxed Lua state. Host capabilities are explicit
|
||||
and opt-in.
|
||||
"""
|
||||
|
||||
@behaviour BDS.Scripting.Runtime
|
||||
|
||||
@impl true
|
||||
def validate(source) when is_binary(source) do
|
||||
case :luerl.load(source, :luerl_sandbox.init()) do
|
||||
{:ok, _chunk, _state} ->
|
||||
:ok
|
||||
|
||||
{:error, errors, warnings} ->
|
||||
{:error, {:compile_error, %{errors: errors, warnings: warnings}}}
|
||||
|
||||
{:lua_error, error, _state} ->
|
||||
{:error, {:lua_error, error}}
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def execute(source, entrypoint, args, opts)
|
||||
when is_binary(source) and is_binary(entrypoint) and is_list(args) and is_list(opts) do
|
||||
with {:ok, state} <- initial_state(opts),
|
||||
{:ok, state} <- put_args(state, args),
|
||||
{:ok, result, _state} <- run_entrypoint(source, entrypoint, state, opts) do
|
||||
{:ok, unwrap_result(result)}
|
||||
end
|
||||
end
|
||||
|
||||
defp initial_state(opts) do
|
||||
state = :luerl_sandbox.init()
|
||||
capabilities = Keyword.get(opts, :capabilities, %{})
|
||||
|
||||
with {:ok, state} <- :luerl.set_table_keys_dec(["bds"], %{}, state),
|
||||
{:ok, state} <- install_progress_callback(state, Keyword.get(opts, :on_progress)),
|
||||
{:ok, state} <- install_capabilities(state, capabilities) do
|
||||
{:ok, state}
|
||||
end
|
||||
end
|
||||
|
||||
defp install_progress_callback(state, nil), do: {:ok, state}
|
||||
|
||||
defp install_progress_callback(state, callback) when is_function(callback, 1) do
|
||||
progress_function = fn args, current_state ->
|
||||
decoded_args = :luerl.decode_list(args, current_state)
|
||||
|
||||
progress_event =
|
||||
case decoded_args do
|
||||
[payload | _] when is_map(payload) -> payload
|
||||
[payload | _] -> normalize_progress_payload(payload)
|
||||
[] -> %{}
|
||||
end
|
||||
|
||||
callback.(progress_event)
|
||||
:luerl.encode_list([true], current_state)
|
||||
end
|
||||
|
||||
case :luerl.set_table_keys_dec(["bds", "report_progress"], progress_function, state) do
|
||||
{:ok, next_state} -> {:ok, next_state}
|
||||
error -> {:error, {:progress_callback_install_failed, error}}
|
||||
end
|
||||
end
|
||||
|
||||
defp install_progress_callback(_state, callback), do: {:error, {:invalid_progress_callback, callback}}
|
||||
|
||||
defp install_capabilities(state, capabilities) when capabilities in [%{}, []], do: {:ok, state}
|
||||
|
||||
defp install_capabilities(state, capabilities) when is_map(capabilities) do
|
||||
Enum.reduce_while(capabilities, {:ok, state}, fn {name, function}, {:ok, current_state} ->
|
||||
path = ["bds", to_string(name)]
|
||||
|
||||
case :luerl.set_table_keys_dec(path, function, current_state) do
|
||||
{:ok, next_state} -> {:cont, {:ok, next_state}}
|
||||
error -> {:halt, {:error, {:capability_install_failed, path, error}}}
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp install_capabilities(_state, capabilities), do: {:error, {:invalid_capabilities, capabilities}}
|
||||
|
||||
defp normalize_progress_payload(payload) when is_list(payload) do
|
||||
if Enum.all?(payload, &match?({key, _value} when is_binary(key) or is_atom(key), &1)) do
|
||||
Map.new(payload, fn {key, value} -> {to_string(key), value} end)
|
||||
else
|
||||
%{value: payload}
|
||||
end
|
||||
end
|
||||
|
||||
defp normalize_progress_payload(payload), do: %{value: payload}
|
||||
|
||||
defp put_args(state, args) do
|
||||
case Luerl.set_table_keys_dec(state, ["__bds_args__"], args) do
|
||||
{:ok, next_state} -> {:ok, next_state}
|
||||
error -> {:error, {:argument_encoding_failed, error}}
|
||||
end
|
||||
end
|
||||
|
||||
defp run_entrypoint(source, entrypoint, state, opts) do
|
||||
script =
|
||||
IO.iodata_to_binary([
|
||||
source,
|
||||
"\nreturn ",
|
||||
entrypoint,
|
||||
"(table.unpack(__bds_args__))\n"
|
||||
])
|
||||
|
||||
case :luerl_sandbox.run(script, sandbox_flags(opts), state) do
|
||||
{:ok, result, next_state} -> {:ok, result, next_state}
|
||||
{:lua_error, error, _state} -> {:error, {:lua_error, error}}
|
||||
{:error, {:reductions, count}} -> {:error, {:reductions_exceeded, count}}
|
||||
{:error, :timeout} -> {:error, :timeout}
|
||||
{:error, reason} -> {:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
defp sandbox_flags(opts) do
|
||||
config = Application.fetch_env!(:bds, :scripting)
|
||||
|
||||
%{
|
||||
max_time: Keyword.get(opts, :timeout, Keyword.fetch!(config, :timeout)),
|
||||
max_reductions: Keyword.get(opts, :max_reductions, Keyword.fetch!(config, :max_reductions)),
|
||||
spawn_opts: Keyword.get(opts, :spawn_opts, [])
|
||||
}
|
||||
end
|
||||
|
||||
defp unwrap_result([]), do: nil
|
||||
defp unwrap_result([value]), do: value
|
||||
defp unwrap_result(values), do: values
|
||||
end
|
||||
24
lib/bds/scripting/runtime.ex
Normal file
24
lib/bds/scripting/runtime.ex
Normal file
@@ -0,0 +1,24 @@
|
||||
defmodule BDS.Scripting.Runtime do
|
||||
@moduledoc """
|
||||
Behaviour for user-script runtimes hosted by bDS.
|
||||
|
||||
The runtime boundary is intentionally narrow: syntax validation and
|
||||
bounded entrypoint execution.
|
||||
"""
|
||||
|
||||
@type source :: String.t()
|
||||
@type entrypoint :: String.t()
|
||||
@type args :: [term()]
|
||||
@type progress_event :: map()
|
||||
@type progress_callback :: (progress_event() -> any())
|
||||
@type execution_option ::
|
||||
{:timeout, non_neg_integer() | :infinity}
|
||||
| {:max_reductions, pos_integer() | :none}
|
||||
| {:spawn_opts, [term()]}
|
||||
| {:on_progress, progress_callback()}
|
||||
| {:capabilities, map()}
|
||||
|
||||
@callback validate(source()) :: :ok | {:error, term()}
|
||||
@callback execute(source(), entrypoint(), args(), [execution_option()]) ::
|
||||
{:ok, term()} | {:error, term()}
|
||||
end
|
||||
Reference in New Issue
Block a user