ProtonStream
View SourceA streaming API for OS processes that mirrors Elixir's Port module. ProtonStream
keeps programs, daemons, and applications launched from Erlang and Elixir contained
and well-behaved. This lightweight library kills OS processes if the Elixir process
running them crashes and if you're running on Linux, it can use cgroups to prevent
many other shenanigans.
Some features:
- Streaming API that mirrors Elixir's
Portmodule - Bidirectional communication with stdin/stdout/stderr
- Set
cgroupcontrols like thresholds on memory and CPU utilization - Start OS processes as a different user or group
- Send SIGKILL to processes that aren't responsive to SIGTERM
- With
cgroups, ensure that all children of launched processes have been killed too
TL;DR
Add proton_stream to your project's mix.exs dependency list:
def deps do
[
{:proton_stream, "~> 1.8"}
]
endStreaming API
The primary API is ProtonStream.open/3 which starts a GenServer managing an OS
process. The calling process becomes the "owner" and receives messages:
{:ok, ps} = ProtonStream.open("cat", [])
# Send data to stdin
send(ps, {self(), {:command, "hello"}})
# Receive stdout
receive do
{^ps, {:data, data}} -> IO.puts("Got: #{data}")
end
# Close when done
send(ps, {self(), :close})Messages
Messages TO ProtonStream (from owner):
{pid, {:command, binary}}- send data to child's stdin{pid, :close}- close the port{pid, {:connect, new_pid}}- transfer ownership to another process
Messages FROM ProtonStream (to owner):
{pid, {:data, data}}- stdout from child process{pid, {:error, data}}- stderr from child process{pid, :closed}- reply to close request{pid, :connected}- reply to connect request{:EXIT, pid, reason}- process termination (when trapping exits)
Bidirectional Communication
ProtonStream supports full bidirectional communication:
{:ok, ps} = ProtonStream.open("bc", ["-q"])
send(ps, {self(), {:command, "2 + 2\n"}})
receive do
{^ps, {:data, "4\n"}} -> :ok
end
send(ps, {self(), {:command, "10 * 10\n"}})
receive do
{^ps, {:data, "100\n"}} -> :ok
endCallback Module Mode
For integration with supervision trees, use ProtonStream.start_link/5 with a callback
module that implements the ProtonStream behaviour:
defmodule MyWorker do
use GenServer
@behaviour ProtonStream
def start_link(args) do
ProtonStream.start_link(__MODULE__, "my_command", [], args, [])
end
@impl GenServer
def init(args) do
{:ok, %{buffer: "", args: args}}
end
@impl ProtonStream
def handle_stdout(data, state) do
{:noreply, %{state | buffer: state.buffer <> data}}
end
@impl ProtonStream
def handle_stderr(data, state) do
IO.puts(:stderr, data)
{:noreply, state}
end
@impl ProtonStream
def handle_exit(reason, state) do
{:stop, reason, state}
end
endAdd to your supervision tree:
children = [
{MyWorker, [some: :args]}
]
Supervisor.start_link(children, strategy: :one_for_one)The callbacks handle_stdout/2, handle_stderr/2, and handle_exit/2 are all optional.
You can also implement GenServer.handle_call/3, GenServer.handle_cast/2, and GenServer.handle_info/2.
FAQ
How do I watch stdout?
Use ProtonStream.open/3 and receive {pid, {:data, data}} messages:
{:ok, ps} = ProtonStream.open("my_program", [])
# In a receive loop or GenServer handle_info:
receive do
{^ps, {:data, data}} -> IO.write(data)
{^ps, {:error, data}} -> IO.write(:stderr, data)
endHow do I send input to stdin?
Use the streaming API:
{:ok, ps} = ProtonStream.open("cat", [])
send(ps, {self(), {:command, "hello world\n"}})How do I stop a ProtonStream process?
Send a close message:
send(ps, {self(), :close})Background
The Erlang VM's port interface lets Elixir applications run external programs. This is important since it's not practical to rewrite everything in Elixir. Plus, if the program is long running like a daemon or a server, you use Elixir to supervise it and restart it on crashes. The catch is that the Erlang VM expects port processes to be well-behaved. As you'd expect, many useful programs don't quite meet the Erlang VM's expectations.
For example, let's say that you want to monitor a network connection and decide
that ping is the right tool. Here's how you could start ping in a process.
iex> pid = spawn(fn -> System.cmd("ping", ["-i", "5", "localhost"], into: IO.stream(:stdio, :line)) end)
#PID<0.6116.0>
PING localhost (127.0.0.1): 56 data bytes
64 bytes from 127.0.0.1: icmp_seq=0 ttl=64 time=0.032 ms
64 bytes from 127.0.0.1: icmp_seq=1 ttl=64 time=0.077 msNow exit the Elixir process:
iex> Process.exit(pid, :oops)
true
iex> :os.cmd(~c"ps -ef | grep ping") |> IO.puts
501 38820 38587 0 9:26PM ?? 0:00.02 /sbin/ping -i 5 localhostAs you can tell, ping is still running after the exit. The reason is that
ping doesn't pay attention to stdin and doesn't notice the Erlang VM closing
it to signal that it should exit.
This is just one of the problems that proton_stream fixes.
Containment with cgroups
Even if you don't make use of any cgroup controller features, having your port process contained can be useful just to make sure that everything is cleaned up on exit including any subprocesses.
To set this up, first create a cgroup with appropriate permissions:
sudo cgcreate -a $(whoami) -g memory,cpu:mycgroup
Then use the cgroup options:
{:ok, ps} = ProtonStream.open("spawning_program", [],
cgroup_controllers: ["cpu"],
cgroup_base: "mycgroup"
)On any error or if the Erlang VM closes the port or if spawning_program exits,
proton_stream will kill all OS processes in cgroup.
Limit memory
ProtonStream.open("memory_hog", [],
cgroup_controllers: ["memory"],
cgroup_base: "mycgroup",
cgroup_sets: [{"memory", "memory.limit_in_bytes", "268435456"}]
)Limit CPU
ProtonStream.open("cpu_hog", [],
cgroup_controllers: ["cpu"],
cgroup_base: "mycgroup",
cgroup_sets: [
{"cpu", "cpu.cfs_period_us", "100000"},
{"cpu", "cpu.cfs_quota_us", "50000"}
]
)stdio flow control
ProtonStream implements flow control to prevent the program's output from
overwhelming the Elixir process's mailbox. The :stdio_window option specifies
the maximum number of unacknowledged bytes allowed (default 10 KB).
Development
To run tests, install cgroup-tools (cgcreate, cgget):
sudo cgcreate -a $(whoami) -g memory,cpu:proton_stream_test
mix test
License
All original source code in this project is licensed under Apache-2.0.
Additionally, this project follows the REUSE recommendations and labels so that licensing and copyright are clear at the file level.
Exceptions to Apache-2.0 licensing are:
- Configuration and data files are licensed under CC0-1.0
- Documentation is CC-BY-4.0