ProtonStream behaviour (proton_stream v1.8.3)
View SourceProtonStream provides a streaming API for OS processes that mirrors Elixir's Port module.
This library is derived from MuonTrap by Frank Hunleth,
with a new streaming API and bidirectional stdin/stdout/stderr support.
ProtonStream can be used in two modes:
- Message-based mode - via
open/3, where the calling process receives messages - Callback module mode - via
start_link/4, where a GenServer module handles events
Message-based Mode
Use open/3 to start a process. The calling process becomes the "owner" and
receives messages about stdout, stderr, and process termination.
Messages TO ProtonStream (from owner)
{pid, {:command, binary}}- send data to child's stdin{pid, :close}- close the port{pid, {:connect, new_pid}}- transfer ownership to another process
Messages FROM ProtonStream (to owner)
{pid, {:data, data}}- stdout from child process{pid, {:error, data}}- stderr from child process{pid, :closed}- reply to close request{pid, :connected}- reply to connect request{:EXIT, pid, reason}- process termination (when trapping exits)
Example
{:ok, ps} = ProtonStream.open("cat", [])
send(ps, {self(), {:command, "hello"}})
receive do
{^ps, {:data, data}} -> IO.puts("Got: #{data}")
end
send(ps, {self(), :close})Callback Module Mode
For integration with supervision trees, use start_link/4 with a callback module
that implements both GenServer and the ProtonStream behaviour.
Callbacks
The following callbacks must be implemented:
GenServer.init/1- called when the process starts, returns initial statehandle_stdout/2- called when stdout data is receivedhandle_stderr/2- called when stderr data is receivedhandle_exit/2- called when the child process exits
Example
defmodule MyWorker do
use GenServer
@behaviour ProtonStream
def start_link(args) do
ProtonStream.start_link(__MODULE__, "my_command", [], args)
end
@impl GenServer
def init(args) do
{:ok, %{buffer: "", args: args}}
end
@impl ProtonStream
def handle_stdout(data, state) do
{:noreply, %{state | buffer: state.buffer <> data}}
end
@impl ProtonStream
def handle_stderr(data, state) do
IO.puts(:stderr, data)
{:noreply, state}
end
@impl ProtonStream
def handle_exit(reason, state) do
{:stop, reason, state}
end
endThe module can then be added to a supervision tree:
children = [
{MyWorker, [some: :args]}
]
Supervisor.start_link(children, strategy: :one_for_one)Callback Return Values
GenServer.init/1returns{:ok, state}or{:stop, reason}handle_stdout/2returns{:noreply, state}or{:stop, reason, state}handle_stderr/2returns{:noreply, state}or{:stop, reason, state}handle_exit/2returns{:stop, reason, state}
GenServer Callbacks
The callback module may also implement GenServer.handle_call/3,
GenServer.handle_cast/2, GenServer.handle_info/2, GenServer.handle_continue/2,
and GenServer.terminate/2 to handle synchronous requests, asynchronous requests,
messages, continuations, and cleanup.
Configuring cgroups
On most Linux distributions, use cgcreate to create a new cgroup:
sudo cgcreate -a $(whoami) -g memory,cpu:proton_stream
Then use the :cgroup_controllers and :cgroup_base options.
Summary
Functions
Returns a specification to start this module under a supervisor.
Close the process.
Send data to the process's stdin.
Transfer ownership to a new process.
Return the absolute path to the muontrap executable.
Open a new OS process and return a handle to it.
Return the OS process ID of the child process.
Types
Callbacks
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec close(pid()) :: :ok
Close the process.
This is equivalent to send(ps, {self(), :close}).
The caller will receive {ps, :closed} when complete.
Send data to the process's stdin.
This is equivalent to send(ps, {self(), {:command, data}}).
Transfer ownership to a new process.
The current owner will receive {ps, :connected} and the new owner
will start receiving data messages.
Return the absolute path to the muontrap executable.
@spec open(binary(), [binary()], keyword()) :: GenServer.on_start()
Open a new OS process and return a handle to it.
Returns {:ok, pid} on success or {:error, reason} on failure.
The calling process becomes the owner and will receive messages.
Options
:cd- the directory to run the command in:env- an enumerable of tuples containing environment key-value as binary:arg0- sets the command arg0:cgroup_controllers- run the command under the specified cgroup controllers:cgroup_base- create a temporary path under the specified cgroup path:cgroup_path- explicitly specify a path to use:cgroup_sets- set a cgroup controller parameter before running the command:delay_to_sigkill- milliseconds before SIGKILL if SIGTERM doesn't work (default 500ms):uid- run the command using the specified uid or username:gid- run the command using the specified gid or group:stdio_window- flow control window size in bytes (default 10KB)
Example
{:ok, ps} = ProtonStream.open("echo", ["hello"])
receive do
{^ps, {:data, "hello\n"}} -> :ok
end
@spec os_pid(pid()) :: non_neg_integer() | nil
Return the OS process ID of the child process.