View Source Stdio behaviour (stdio v0.4.4)

Stream standard I/O from system processes.

Overview

Stdio manages system processes. The process standard output (stdout) and standard error (stderr) are represented as Streams of stdio/0 tuples. pipe!/4 reads standard input (stdin) from a binary stream.

Streaming stdout/stderr

A simple example of creating a stream using Stdio.stream!/1:

iex> Stdio.stream!("echo hello; echo world >&2") |> Enum.to_list()
[stdout: "hello\n", stderr: "world\n", exit_status: 0]

Piping to stdin

An example of using a system process to transform a stream using Stdio.pipe!/2:

iex> ["hello\n"] |> Stdio.pipe!("sed -u 's/hello/bye/'") |> Enum.to_list()
[stdout: "bye\n", exit_status: 0]

Note

Process output may be buffered. The pipe example unbuffers the output by using the -u option of sed(1).

Privileges

Warning

Some behaviours, notably Stdio.Container and Stdio.Jail, require the system supervisor (not the beam process!) to be running as the root user. See setuid/0 for instructions for set up.

Behaviours may change the root filesystem for the process. The default chroot(2) directory hierarchy can be created by running:

iex> Stdio.Container.make_chroot_tree!()
:ok

Process Capabilities and Namespaces

Stdio behaviours can restrict system process capabilities or isolate the process using namespaces or jails.

The default implementation uses the standard fork(2)'ing process model.

Other implementations are Stdio.Container (Linux namespaces(7)), Stdio.Rootless (Linux user_namespaces(7)) and Stdio.Jail (FreeBSD jail(2)

iex> Stdio.stream!("id -u", Stdio.Rootless) |> Enum.to_list()
[stdout: "65534\n", exit_status: 0]

Supervising System Processes

Stdio processes can be supervised and restarted by a Task.Supervisor:

Supervisor.start_link(
  [{Task.Supervisor, name: Init.TaskSupervisor}],
  strategy: :one_for_one
)

To create and supervise a system process:

require Logger

Task.Supervisor.start_child(
  Init.TaskSupervisor,
  fn ->
    Stdio.stream!("nc -l 4545")
    |> Stream.each(fn
      {:stdout, e} ->
        Logger.info(e)

      {:stderr, e} ->
        Logger.error(e)

      e ->
        Logger.warn("#{inspect(e)}")
    end)
    |> Stream.run()
  end,
  restart: :permanent
)

In another shell, connect to port 4545 and send some data:

$ nc localhost 4545
test
123
^C

$ nc localhost 4545
netcat was restarted by the supervisor!
^C

In iex:

05:40:26.277 [info]  test
05:40:27.669 [info]  123
05:40:28.044 [warning] {:exit_status, 0}
05:40:52.929 [info]  netcat was restarted by the supervisor!
05:40:53.451 [warning] {:exit_status, 0}

Summary

Types

Configuration options for callbacks

Phases are functions defining how a process is forked, configured and supervised.

Tuples containing the process stdout, stderr and termination status.

t()

Callbacks

Function to fork(2) a system subprocess from the supervisor created by the task/1 callback.

Function run on operation error.

Function run during stream termination.

A sequence of Stdio.Op.t()/0 instructions for a system process.

Function to create a system supervisor for the Stdio stream.

Functions

Pipe data into the standard input of a system process.

Terminate descendents of a supervisor process.

Terminate descendents of a process.

Request system supervisor runs as root.

Stream standard I/O from system processes.

Use a previously created supervisor process.

Types

@type config() ::
  {:net, :none | :host}
  | {:environ, [String.t()]}
  | {:uid, non_neg_integer()}
  | {:gid, non_neg_integer()}
  | {:groups, [non_neg_integer()]}
  | {:fstab, [String.t()]}
  | {:path, Path.t()}
  | {:hostname, String.t()}
  | {:setuid, boolean()}
  | {:priority, -20..19}
  | {:rlimit,
     {:rlimit_core
      | :rlimit_cpu
      | :rlimit_fsize
      | :rlimit_nofile
      | :rlimit_nproc, %{cur: integer(), max: integer()}}}
  | {Keyword.key(), Keyword.value()}

Configuration options for callbacks:

net

Defines the network capability for behaviours with support for network namespaces.

  • none: process does not have access to network

  • host: process shares the host's network namespace

environ

Set the process environment using a list of = separated strings.

iex> Stdio.stream!(["/usr/bin/env"], Stdio.Process, environ: ["TERM=dumb", "PATH=/bin"]) |> Enum.to_list()
[stdout: "TERM=dumb\nPATH=/bin\n", exit_status: 0]

uid

Requires setuid/0

Set the UID of the subprocess. If not set, a UID is generated by the behaviour.

gid

Requires setuid/0

Set the GID of the subprocess. If not set, a GID is generated by the behaviour.

groups

Requires setuid/0

Set a list of additional groups (a list of GIDs) for the subprocess. By default, additional groups are dropped.

fstab

Requires setuid/0

A list of directories to bind mount inside of the container namespace. Relative paths are resolved relative to the application priv directory.

path

May require setuid/0

Sets the working directory of the subprocess. Behaviours changing the root directory use path as the root.

hostname

Requires setuid/0

Sets the hostname of the process.

setuid

If set to false (the default), drops privileges before running the command.

priority

Sets the process priority (default: 0).

rlimit

A map containing setrlimit(2) settings. The default disables core dumps.

@type phase() ::
  {:task, (Keyword.t() -> {:ok, t()} | {:error, :prx.posix()})}
  | {:init,
     (Keyword.t() ->
        (init :: :prx.task() ->
           {:ok, pipeline :: pipeline()} | {:error, :prx.posix()}))}
  | {:ops, (Keyword.t() -> [Stdio.Op.t()])}
  | {:onerror, (Keyword.t() -> (sh :: :prx.task() -> any()))}
  | {:onexit, (Keyword.t() -> (Stdio.ProcessTree.t() -> boolean()))}

Phases are functions defining how a process is forked, configured and supervised.

Phases can be implemented as a behaviour or as a list of functions.

stateDiagram-v2
    direction LR
    [*] --> Supervisor: fork(2)
    Supervisor --> Subprocess: fork(2)
    Subprocess --> Terminate: stream
    Terminate --> [*]
    state Supervisor {
        direction LR
        [*] --> task
        task --> [*]
    }
    state Subprocess {
        state result <<choice>>
        direction LR
        [*] --> init
        init --> ops
        ops --> result
        result --> [*]: ok
        result --> onerror: {error, posix()}
    }
    state Terminate {
        direction LR
        onexit --> [*]
    }

task

Returns a :prx.task/0 system process configured to act as a system process supervisor.

init

Forks and initializes a subprocess of the system process supervisor. When using namespaces, this process will be PID 1 or the container init process.

ops

Provides a sequence of system calls to run on the subprocess.

onerror

Function called to clean up if any of the operations defined in ops/1 fail.

onexit

Function called when the stream is closed.

@type pipeline() :: [%{task: :prx.task(), pid: :prx.pid_t()}]
@type stdio() ::
  {:stdout, binary()}
  | {:stderr, binary()}
  | {:exit_status, integer()}
  | {:termsig, atom()}

Tuples containing the process stdout, stderr and termination status.

  • :stdout: standard output

  • :stderr: standard error

  • :exit_status: process has exited with status value

  • :termsig: the termination signal if the process exited due to a signal

@type t() :: %Stdio{
  atexit:
    :shutdown
    | :noshutdown
    | :noreap
    | (init :: :prx.task(), pipeline :: pipeline(), map() -> any()),
  init: :prx.task() | :noproc,
  state: map()
}

Callbacks

@callback init(Keyword.t()) :: (:prx.task() -> {:ok, pipeline()} | {:error, :prx.posix()})

Function to fork(2) a system subprocess from the supervisor created by the task/1 callback.

In a PID namespace, this process will be the init process or PID 1.

@callback onerror(Keyword.t()) :: (sh :: :prx.task() -> any())

Function run on operation error.

onerror/1 runs if any of the operations defined in ops/1 returns an error. An error is an error tuple:

{:error, :prx.posix()}

The return value of onerror/1 is ignored. Exceptions within the callback are not handled by default and may terminate the linked process.

The supervisor process for the task can be found by calling :prx.parent/1. For example, to signal the process group:

supervisor = :prx.parent(sh)
:prx.kill(supervisor, 0, :SIGTERM)
@callback onexit(Keyword.t()) :: (Stdio.ProcessTree.t() -> boolean())

Function run during stream termination.

onexit/1 runs when the stream is closed. The function can perform clean up and signal any lingering processes. Returns true if subprocesses were found to be running during cleanup.

@callback ops(Keyword.t()) :: [Stdio.Op.t()]

A sequence of Stdio.Op.t()/0 instructions for a system process.

Operations are defined in prx but any module can be called by specifying the module name.

@callback task(Keyword.t()) :: {:ok, t()} | {:error, atom()}

Function to create a system supervisor for the Stdio stream.

Create a task using :prx.fork/0 and configured as a process supervisor.

See supervise/1 for the default operations.

Functions

Link to this function

pipe!(stream, command, behaviour \\ Stdio.Process, config \\ [])

View Source
@spec pipe!(
  Enumerable.t(),
  String.t()
  | [String.t(), ...]
  | {arg0 :: String.t(), argv :: [String.t(), ...]},
  module() | [phase()] | {module(), [phase()]},
  task: [config()],
  init: [config()],
  ops: [config()],
  onerror: [config()],
  onexit: [config()]
) :: Enumerable.t()

Pipe data into the standard input of a system process.

Input is a stream of binaries. Empty binaries are ignored.

See stream!/3 for arguments.

Examples

iex> Stream.unfold(1000, fn 0 -> nil; n -> {"#{n}\n", n - 1} end)
...> |> Stdio.pipe!("grep --line-buffered 442")
...> |> Enum.to_list()
[stdout: "442\n", exit_status: 0]

iex> Stdio.stream!("echo test; kill $$")
iex> |> Stream.transform(<<>>, &Stdio.Stream.stdout_to_stderr/2)
iex> |> Stdio.pipe!("sed -u 's/t/_/g'")
iex> |> Enum.to_list()
[stdout: "_es_\n", exit_status: 0]

iex> [<<>>, <<"a">>, <<>>] |> Stdio.pipe!("cat") |> Enum.to_list()
[stdout: "a", exit_status: 0]
Link to this function

reap(pstree, signal \\ :SIGKILL)

View Source
@spec reap(Stdio.ProcessTree.t(), atom()) :: :ok

Terminate descendents of a supervisor process.

Link to this function

reap(pstree, pid, signal)

View Source
@spec reap(Stdio.ProcessTree.t(), :prx.pid_t(), atom()) :: :ok

Terminate descendents of a process.

reap signals subprocesses of a process identified by PID.

Background and daemonized subprocesses will also be terminated if supported by the platform.

Request system supervisor runs as root.

Some system calls require the process to be UID 0. These calls typically are operations like mounting filesystems or creating new namespaces (excluding user namespaces such as Stdio.Rootless) or putting a process into a jail(2) or a chroot(2).

Warning

Enabling privileges is a global setting: all system supervisor processes will run as UID 0.

Setuid Binary

Probably the simplest method for production use is adding the setuid bit to the prx binary (see :prx_drv.progname/0):

  • create a group with privileges to run the supervisor (optional)

      # example: create an elixir group
      groupadd elixir
  • find the location of the binary

      :prx_drv.progname()
  • set the setuid bit

      sudo chown root:elixir /path/to/priv/prx
      sudo chmod 4750 /path/to/priv/prx

Using a Setuid Exec Chain

prx can be configured to use a setuid helper program such as sudo(8) or doas(1).

Stdio.setuid/0 is a noop if the prx binary is setuid.

Before launching any supervisors, run:

 Stdio.setuid()

sudo(8)

    $ sudo visudo
    elixiruser ALL = NOPASSWD: /path/to/prx/priv/prx
    Defaults!/path/to/prx/priv/prx !requiretty

doas(8)

    permit nopass setenv { ENV PS1 SSH_AUTH_SOCK } :elixirgroup

Errors

shell process exited with reason: {:error, {:error, :eagain}}

Check prx is executable:

$ ./deps/prx/priv/prx -h
prx 0.40.5
usage: prx -c <path> [<options>]

error attempting to fork subprocess: insufficient permissions (eperm)

Verify prx can be run using sudo:

$ sudo ./deps/prx/priv/prx -h
prx 0.40.5
usage: prx -c <path> [<options>]
Link to this function

stream!(command, behaviour \\ Stdio.Process, config \\ [])

View Source
@spec stream!(
  String.t()
  | [String.t(), ...]
  | {arg0 :: String.t(), argv :: [String.t(), ...]},
  module() | [phase()] | {module(), [phase()]},
  task: [config()],
  init: [config()],
  ops: [config()],
  onexit: [config()],
  onerror: [config()]
) :: Enumerable.t()

Stream standard I/O from system processes.

command

Commands are strings interpreted by /bin/sh or list of strings (an argv) passed directly to execve(2).

Warning

When using an argv, $PATH is not consulted. The path to the executable must be provided as the first argument.

behaviour

The process behaviour can be customized by passing in:

  • a module implementing the callbacks

  • a keyword list with functions of type phase/0

  • a tuple consisting of a module and a list of phase/0 functions to override the module callbacks

config

For configuration options, see config/0.

Examples

iex> Stdio.stream!("echo test") |> Enum.to_list()
[stdout: "test\n", exit_status: 0]

iex> Stdio.stream!(["/bin/echo", "test"]) |> Enum.to_list()
[stdout: "test\n", exit_status: 0]

iex> Stdio.stream!("ip --oneline -4 addr show dev lo", Stdio.Container) |> Enum.to_list()
[exit_status: 0]

iex> Stdio.stream!("ip --oneline -4 addr show dev lo", Stdio.Container, net: :host) |> Enum.to_list()
[
    stdout: "1: lo    inet 127.0.0.1/8 scope host lo\\       valid_lft forever preferred_lft forever\n",
    exit_status: 0
]

iex> Stdio.stream!("pwd", {Stdio.Process, ops: fn _ -> [{:chdir, ["/tmp"]}] end}) |> Enum.to_list()
[stdout: "/tmp\n", exit_status: 0]
Link to this function

supervise(init, atexit \\ :shutdown, filter \\ {:allow, [:close, :exit, :fork, :getpid, :kill, :setcpid, :sigaction, :clone, :procctl]})

View Source
@spec supervise(
  :prx.task(),
  :shutdown
  | :noshutdown
  | :noreap
  | (init :: :prx.task(), pipeline :: pipeline(), state :: map() -> any()),
  [:prx.call()] | {:allow, [:prx.call()]} | {:deny, [:prx.call()]}
) :: {:ok, t()} | {:error, :prx.posix()}

Configure a task to be a system process supervisor.

The task:

  • becomes an init process (dependent on platform)

  • sets the process name to supervise

  • enables flow control for child stdout/stderr

  • sets the shutdown/reaper behaviour

  • terminates child processes if beam exits

  • restricts the task to a limited set of calls required for supervision

Link to this function

supervisor(atexit \\ :shutdown, filter \\ {:allow, [:close, :exit, :fork, :getpid, :kill, :setcpid, :sigaction, :clone, :procctl]})

View Source
@spec supervisor(
  :shutdown
  | :noshutdown
  | :noreap
  | (init :: :prx.task(), pipeline :: pipeline(), state :: map() -> any()),
  [:prx.call()] | {:allow, [:prx.call()]} | {:deny, [:prx.call()]}
) :: {:ok, t()} | {:error, :prx.posix()}

Create a supervisor task

Returns a Stdio.t/0 configured to act as a system process supervisor.

Link to this function

with_supervisor(module, supervisor)

View Source
@spec with_supervisor(module(), t()) :: Keyword.t()

Use a previously created supervisor process.

The supervisor is not shutdown when the stream exits.

Examples

iex> {:ok, supervisor} = Stdio.supervisor(:noshutdown)
iex> Stdio.stream!("echo test", Stdio.with_supervisor(Stdio.Process, supervisor)) |> Enum.to_list()
[stdout: "test\n", exit_status: 0]
iex> Stdio.stream!("echo test", Stdio.with_supervisor(Stdio.Process, supervisor)) |> Enum.to_list()
[stdout: "test\n", exit_status: 0]
iex> :prx.stop(supervisor.init)
graph LR
    B([beam]) --> S[Supervise]
    S -.-> I0[[sh]]
    S --> I1[[sh]]
    I0 -.-> C0[[echo test]]
    I1 --> C1[[echo test]]