ProtonStream behaviour (proton_stream v1.8.3)

View Source

ProtonStream provides a streaming API for OS processes that mirrors Elixir's Port module.

This library is derived from MuonTrap by Frank Hunleth, with a new streaming API and bidirectional stdin/stdout/stderr support.

ProtonStream can be used in two modes:

  1. Message-based mode - via open/3, where the calling process receives messages
  2. Callback module mode - via start_link/4, where a GenServer module handles events

Message-based Mode

Use open/3 to start a process. The calling process becomes the "owner" and receives messages about stdout, stderr, and process termination.

Messages TO ProtonStream (from owner)

  • {pid, {:command, binary}} - send data to child's stdin
  • {pid, :close} - close the port
  • {pid, {:connect, new_pid}} - transfer ownership to another process

Messages FROM ProtonStream (to owner)

  • {pid, {:data, data}} - stdout from child process
  • {pid, {:error, data}} - stderr from child process
  • {pid, :closed} - reply to close request
  • {pid, :connected} - reply to connect request
  • {:EXIT, pid, reason} - process termination (when trapping exits)

Example

{:ok, ps} = ProtonStream.open("cat", [])
send(ps, {self(), {:command, "hello"}})

receive do
  {^ps, {:data, data}} -> IO.puts("Got: #{data}")
end

send(ps, {self(), :close})

Callback Module Mode

For integration with supervision trees, use start_link/4 with a callback module that implements both GenServer and the ProtonStream behaviour.

Callbacks

The following callbacks must be implemented:

Example

defmodule MyWorker do
  use GenServer
  @behaviour ProtonStream

  def start_link(args) do
    ProtonStream.start_link(__MODULE__, "my_command", [], args)
  end

  @impl GenServer
  def init(args) do
    {:ok, %{buffer: "", args: args}}
  end

  @impl ProtonStream
  def handle_stdout(data, state) do
    {:noreply, %{state | buffer: state.buffer <> data}}
  end

  @impl ProtonStream
  def handle_stderr(data, state) do
    IO.puts(:stderr, data)
    {:noreply, state}
  end

  @impl ProtonStream
  def handle_exit(reason, state) do
    {:stop, reason, state}
  end
end

The module can then be added to a supervision tree:

children = [
  {MyWorker, [some: :args]}
]

Supervisor.start_link(children, strategy: :one_for_one)

Callback Return Values

GenServer Callbacks

The callback module may also implement GenServer.handle_call/3, GenServer.handle_cast/2, GenServer.handle_info/2, GenServer.handle_continue/2, and GenServer.terminate/2 to handle synchronous requests, asynchronous requests, messages, continuations, and cleanup.

Configuring cgroups

On most Linux distributions, use cgcreate to create a new cgroup:

sudo cgcreate -a $(whoami) -g memory,cpu:proton_stream

Then use the :cgroup_controllers and :cgroup_base options.

Summary

Functions

Returns a specification to start this module under a supervisor.

Close the process.

Send data to the process's stdin.

Transfer ownership to a new process.

Return the absolute path to the muontrap executable.

Open a new OS process and return a handle to it.

Return the OS process ID of the child process.

Types

t()

@type t() :: %ProtonStream{
  args: [binary()],
  buffer: binary(),
  cgroup_path: binary() | nil,
  command: binary(),
  owner: pid(),
  port: port() | nil,
  state: term()
}

Callbacks

handle_exit(reason, state)

(optional)
@callback handle_exit(reason :: term(), state :: term()) ::
  {:stop, reason :: term(), new_state :: term()}

handle_stderr(data, state)

(optional)
@callback handle_stderr(data :: binary(), state :: term()) ::
  {:noreply, new_state :: term()}
  | {:noreply, new_state :: term(),
     timeout() | :hibernate | {:continue, term()}}
  | {:stop, reason :: term(), new_state :: term()}

handle_stdout(data, state)

(optional)
@callback handle_stdout(data :: binary(), state :: term()) ::
  {:noreply, new_state :: term()}
  | {:noreply, new_state :: term(),
     timeout() | :hibernate | {:continue, term()}}
  | {:stop, reason :: term(), new_state :: term()}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

close(server)

@spec close(pid()) :: :ok

Close the process.

This is equivalent to send(ps, {self(), :close}). The caller will receive {ps, :closed} when complete.

command(server, data)

@spec command(pid(), iodata()) :: :ok

Send data to the process's stdin.

This is equivalent to send(ps, {self(), {:command, data}}).

connect(server, new_owner)

@spec connect(pid(), pid()) :: :ok

Transfer ownership to a new process.

The current owner will receive {ps, :connected} and the new owner will start receiving data messages.

muontrap_path()

Return the absolute path to the muontrap executable.

open(command, args \\ [], opts \\ [])

@spec open(binary(), [binary()], keyword()) :: GenServer.on_start()

Open a new OS process and return a handle to it.

Returns {:ok, pid} on success or {:error, reason} on failure. The calling process becomes the owner and will receive messages.

Options

  • :cd - the directory to run the command in
  • :env - an enumerable of tuples containing environment key-value as binary
  • :arg0 - sets the command arg0
  • :cgroup_controllers - run the command under the specified cgroup controllers
  • :cgroup_base - create a temporary path under the specified cgroup path
  • :cgroup_path - explicitly specify a path to use
  • :cgroup_sets - set a cgroup controller parameter before running the command
  • :delay_to_sigkill - milliseconds before SIGKILL if SIGTERM doesn't work (default 500ms)
  • :uid - run the command using the specified uid or username
  • :gid - run the command using the specified gid or group
  • :stdio_window - flow control window size in bytes (default 10KB)

Example

{:ok, ps} = ProtonStream.open("echo", ["hello"])
receive do
  {^ps, {:data, "hello\n"}} -> :ok
end

os_pid(server)

@spec os_pid(pid()) :: non_neg_integer() | nil

Return the OS process ID of the child process.

start_link(module, command, args \\ [], init_arg \\ [], opts \\ [])

@spec start_link(module(), binary(), [binary()], term(), keyword()) ::
  GenServer.on_start()