View Source NimblePool behaviour (NimblePool v1.0.0)
NimblePool
is a tiny resource-pool implementation.
Pools in the Erlang VM, and therefore in Elixir, are generally process-based: they manage a group of processes. The downside of said pools is that when they have to manage resources, such as sockets or ports, the additional process leads to overhead.
In such pools, you usually end-up with two scenarios:
You invoke the pool manager, which returns the pooled process, which performs the operation on the socket or port for you, returning you the reply. This approach is non-optimal because all of the data sent and returned by the resource needs to be copied between processes
You invoke the pool manager, which returns the pooled process, which gives you access to the resource. Then you can act directly on the resource, avoiding the data copying, but you need to keep the state of the resource in sync with the process
NimblePool allows you to implement the second scenario without the addition of processes, which leads to a simpler and more efficient implementation. You should consider using NimblePool whenever you have to manage sockets, ports, or NIF resources and you want the client to perform one-off operations on them. For example, NimblePool is a good solution to manage HTTP 1 connections, ports that need to communicate with long-running programs, etc.
The downside of NimblePool is that, because all resources are under a single process, any resource management operation will happen on this single process, which is more likely to become a bottleneck. This can be addressed, however, by starting one NimblePool per scheduler and by doing scheduler-based dispatches.
NimblePool may not be a good option to manage processes. After all, the goal of NimblePool is to avoid creating processes for resources. If you already have a process, using a process-based pool such as poolboy
will provide a better abstraction.
Finally, avoid using NimblePool to manage resources that support multiplexing, such as HTTP 2 connections. In fact, pools are not a good option to manage resources with multiplexing in general, as the pool removes the ability to multiplex.
types-of-callbacks
Types of callbacks
NimblePool has two types of callbacks. Worker callbacks and pool callbacks. The worker callbacks configure the behaviour of each worker, such as initialization, checkin and checkout. The pool callbacks configure general pool behaviour, such as initialization and queueing.
examples
Examples
To use NimblePool
, you must define a module that implements the pool worker logic, outlined in the NimblePool
behaviour.
port-based-example
Port-based example
The example below keeps ports on the pool and check them out on every command. Please read the docs for Port
before using the approach below, especially in regards to zombie ports.
defmodule PortPool do
@behaviour NimblePool
@doc ~S"""
Executes a given command against a port kept by the pool.
First we start the port:
iex> child = {NimblePool, worker: {PortPool, :cat}, name: PortPool}
iex> Supervisor.start_link([child], strategy: :one_for_one)
Now we can run commands against the pool of ports:
iex> PortPool.command(PortPool, "hello\n")
"hello\n"
iex> PortPool.command(PortPool, "world\n")
"world\n"
"""
def command(pool, command, opts \\ []) do
pool_timeout = Keyword.get(opts, :pool_timeout, 5000)
receive_timeout = Keyword.get(opts, :receive_timeout, 15000)
NimblePool.checkout!(pool, :checkout, fn {pid, _}, port ->
send(port, {self(), {:command, command}})
receive do
{^port, {:data, data}} ->
try do
Port.connect(port, pid)
{data, :ok}
rescue
_ -> {data, :close}
end
after
receive_timeout ->
exit(:receive_timeout)
end
end, pool_timeout)
end
@impl NimblePool
def init_worker(:cat = pool_state) do
path = System.find_executable("cat")
port = Port.open({:spawn_executable, path}, [:binary, args: ["-"]])
{:ok, port, pool_state}
end
@impl NimblePool
# Transfer the port to the caller
def handle_checkout(:checkout, {pid, _}, port, pool_state) do
Port.connect(port, pid)
{:ok, port, port, pool_state}
end
@impl NimblePool
# We got it back
def handle_checkin(:ok, _from, port, pool_state) do
{:ok, port, pool_state}
end
def handle_checkin(:close, _from, _port, pool_state) do
{:remove, :closed, pool_state}
end
@impl NimblePool
# On terminate, effectively close it
def terminate_worker(_reason, port, pool_state) do
Port.close(port)
{:ok, pool_state}
end
end
http1-based-example
HTTP1-based example
The pool below uses Mint for HTTP1 connections. It establishes connections eagerly. A better approach may be to establish connections lazily on checkout, as done by Finch, which is built on top of Mint+NimbleOptions.
defmodule HTTP1Pool do
@behaviour NimblePool
@doc ~S"""
Executes a given command against a connection kept by the pool.
First we start the connection:
child = {NimblePool, worker: {HTTP1Pool, {:https, "elixir-lang.org", 443}}, name: HTTP1Pool}
Supervisor.start_link([child], strategy: :one_for_one)
Then we can access it:
iex> HTTP1Pool.get(HTTP1Pool, "/")
{:ok, %{status: 200, ...}}
"""
def get(pool, path, opts \\ []) do
pool_timeout = Keyword.get(opts, :pool_timeout, 5000)
receive_timeout = Keyword.get(opts, :receive_timeout, 15000)
NimblePool.checkout!(
pool,
:checkout,
fn _from, conn ->
{{kind, result_or_error}, conn} =
with {:ok, conn, ref} <- Mint.HTTP1.request(conn, "GET", path, [], nil),
{:ok, conn, result} <- receive_response([], conn, ref, %{}, receive_timeout) do
{{:ok, result}, transfer_if_open(conn)}
end
{{kind, result_or_error}, conn}
end,
pool_timeout
)
end
defp transfer_if_open(conn) do
if Mint.HTTP1.open?(conn) do
{:ok, conn}
else
:closed
end
end
defp receive_response([], conn, ref, response, timeout) do
{:ok, conn, entries} = Mint.HTTP1.recv(conn, 0, timeout)
receive_response(entries, conn, ref, response, timeout)
end
defp receive_response([entry | entries], conn, ref, response, timeout) do
case entry do
{kind, ^ref, value} when kind in [:status, :headers] ->
response = Map.put(response, kind, value)
receive_response(entries, conn, ref, response, timeout)
{:data, ^ref, data} ->
response = Map.update(response, :data, data, &(&1 <> data))
receive_response(entries, conn, ref, response, timeout)
{:done, ^ref} ->
{:ok, conn, response}
{:error, ^ref, error} ->
{:error, conn, error}
end
end
@impl NimblePool
def init_worker({scheme, host, port} = pool_state) do
parent = self()
async = fn ->
# TODO: Add back-off
{:ok, conn} = Mint.HTTP1.connect(scheme, host, port, [])
{:ok, conn} = Mint.HTTP1.controlling_process(conn, parent)
conn
end
{:async, async, pool_state}
end
@impl NimblePool
# Transfer the conn to the caller.
# If we lost the connection, then we remove it to try again.
def handle_checkout(:checkout, _from, conn, pool_state) do
with {:ok, conn} <- Mint.HTTP1.set_mode(conn, :passive) do
{:ok, conn, conn, pool_state}
else
_ -> {:remove, :closed, pool_state}
end
end
@impl NimblePool
# We got it back.
def handle_checkin(state, _from, _old_conn, pool_state) do
with {:ok, conn} <- state,
{:ok, conn} <- Mint.HTTP1.set_mode(conn, :active) do
{:ok, conn, pool_state}
else
{:error, _} -> {:remove, :closed, pool_state}
end
end
@impl NimblePool
# If it is closed, drop it.
def handle_info(message, conn) do
case Mint.HTTP1.stream(conn, message) do
{:ok, _, _} -> {:ok, conn}
{:error, _, _, _} -> {:remove, :closed}
:unknown -> {:ok, conn}
end
end
@impl NimblePool
# On terminate, effectively close it.
# This will succeed even if it was already closed or if we don't own it.
def terminate_worker(_reason, conn, pool_state) do
Mint.HTTP1.close(conn)
{:ok, pool_state}
end
end
Link to this section Summary
Worker callbacks
Checks a worker in.
Checks a worker out.
Receives a message in the worker.
Handle pings due to inactivity on worker.
Handles update instruction from checked out worker.
Initializes the worker.
Pool callbacks
Executed by the pool, whenever a request to checkout a worker is enqueued.
Initializes the pool.
Terminates a worker.
Functions
Checks out from the pool.
Defines a pool to be started under the supervision tree.
Starts a pool.
Stops a pool.
Sends an update
instruction to the pool about the checked out worker.
Link to this section Types
Link to this section Worker callbacks
handle_checkin(client_state, from, worker_state, pool_state)
View Source (optional)@callback handle_checkin(client_state(), from(), worker_state(), pool_state()) :: {:ok, worker_state(), pool_state()} | {:remove, user_reason(), pool_state()}
Checks a worker in.
It receives the client_state
, returned by the checkout!/4
anonymous function and it must return either
{:ok, worker_state, pool_state}
or {:remove, reason, pool_state}
.
Note this callback is synchronous and therefore will block the pool. Avoid performing long work in here, instead do as much work as possible on the client.
Once the connection is checked in, it may immediately be handed to another client, without traversing any of the messages in the pool inbox.
This callback is optional.
handle_checkout(maybe_wrapped_command, from, worker_state, pool_state)
View Source@callback handle_checkout( maybe_wrapped_command :: term(), from(), worker_state(), pool_state() ) :: {:ok, client_state(), worker_state(), pool_state()} | {:remove, user_reason(), pool_state()} | {:skip, Exception.t(), pool_state()}
Checks a worker out.
It receives maybe_wrapped_command
. The command
is given to the checkout!/4
call and may optionally be wrapped by handle_enqueue/2
. It must return either
{:ok, client_state, worker_state, pool_state}
, {:remove, reason, pool_state}
,
or {:skip, Exception.t(), pool_state}
.
If :remove
is returned, NimblePool
will attempt to checkout another
worker.
If :skip
is returned, NimblePool
will skip the checkout, the client will
raise the returned exception, and the worker will be left ready for the next
checkout attempt.
Note this callback is synchronous and therefore will block the pool. Avoid performing long work in here, instead do as much work as possible on the client.
Once the connection is checked out, the worker won't receive any
messages targeted to handle_info/2
.
@callback handle_info(message :: term(), worker_state()) :: {:ok, worker_state()} | {:remove, user_reason()}
Receives a message in the worker.
It receives the message
and it must return either
{:ok, worker_state}
or {:remove, reason}
.
Note this callback is synchronous and therefore will block the pool. Avoid performing long work in here.
This callback is optional.
@callback handle_ping( worker_state(), pool_state() ) :: {:ok, worker_state()} | {:remove, user_reason()} | {:stop, user_reason()}
Handle pings due to inactivity on worker.
Executed whenever the idle worker periodic timer verifies that a worker has been idle
on the pool for longer than :worker_idle_timeout
pool configuration milliseconds.
This callback must return one of the following values:
{:ok, worker_state}
: Updates worker state.{:remove, user_reason}
: The pool will proceed to the standard worker termination defined interminate_worker/3
.{:stop, user_reason}
: The entire pool process will be terminated, andterminate_worker/3
will be called for every worker on the pool.
This callback is optional.
max-idle-pings
Max idle pings
The :max_idle_pings
pool option is useful to prevent sequencial termination of a large number
of workers. But it is important to keep in mind the following behaviours whenever utilizing it.
If you are not terminating workers with
handle_ping/2
, you may end up pinging only the same workers over and over again because each cycle will ping only the first:max_idle_pings
workersIf you are terminating workers with
handle_ping/2
, the last worker may be terminated after up toworker_idle_timeout + worker_idle_timeout * ceil(number_of_workers/max_idle_pings)
instead of2 * worker_idle_timeout
milliseconds of idle time.For instance consider a pool with 10 workers and a ping of 1 second.
Given a negligible worker termination time and a worst case scenario where all the workers go idle right after a verification cycle is started,
then without
max_idle_pings
the last worker will be terminated in the next cycle (2 seconds),whereas with a
max_idle_pings
of 2 the last worker will be terminated only in the 5th cycle (6 seconds).
disclaimers
Disclaimers
On lazy pools, if no worker is currently on the pool the callback will never be called. Therefore you can not rely on this callback to terminate empty lazy pools.
On not lazy pools, if you return
{:remove, user_reason}
you may end up terminating and initializing workers at the same time every idle verification cycle.On large pools, if many resources goes idle at the same cycle you may end up terminating a large number of workers sequentially, what could lead to the pool being unable to fulfill requests. See
:max_idle_pings
option to prevent this.
@callback handle_update(message :: term(), worker_state(), pool_state()) :: {:ok, worker_state(), pool_state()}
Handles update instruction from checked out worker.
See update/2
for more information.
This callback is optional.
@callback init_worker(pool_state()) :: {:ok, worker_state(), pool_state()} | {:async, (-> worker_state()), pool_state()}
Initializes the worker.
It receives the worker argument passed to start_link/1
. It must
return {:ok, worker_state, pool_state}
or {:async, fun}
, where the fun
is a zero-arity function that must return the worker state.
Note this callback is synchronous and therefore will block the pool.
If you need to perform long initialization, consider using the
{:async, fun}
return type.
Link to this section Pool callbacks
@callback handle_enqueue(command :: term(), pool_state()) :: {:ok, maybe_wrapped_command :: term(), pool_state()} | {:skip, Exception.t(), pool_state()}
Executed by the pool, whenever a request to checkout a worker is enqueued.
The command
argument should be treated as an opaque value, but it can be
wrapped with some data to be used in handle_checkout/4
.
It must return either {:ok, maybe_wrapped_command, pool_state}
or
{:skip, Exception.t(), pool_state}
if checkout is to be skipped.
Note this callback is synchronous and therefore will block the pool. Avoid performing long work in here.
This callback is optional.
@callback init_pool(init_arg()) :: {:ok, pool_state()} | :ignore | {:stop, reason :: any()}
Initializes the pool.
It receives the worker argument passed to start_link/1
and must
return {:ok, pool_state}
upon successful initialization,
:ignore
to exit normally, or {:stop, reason}
to exit with reason
and return {:error, reason}
.
This is a good place to perform a registration for example.
It must return the pool_state
. The pool_state
is given to
init_worker
. By default, it simply returns the arguments given.
This callback is optional.
@callback terminate_worker( :DOWN | :timeout | :throw | :error | :exit | user_reason(), worker_state(), pool_state() ) :: {:ok, pool_state()}
Terminates a worker.
This callback is invoked with :DOWN
whenever the client
link breaks, with :timeout
whenever the client times out,
with one of :throw
, :error
, :exit
whenever the client
crashes with one of the reasons above.
If at any point you return {:remove, reason}
, the reason
will also be given to terminate
. If any callback raises,
the raised exception will be given as reason
.
It receives the latest known worker_state
, which may not
be the latest state. For example, if a client checksout the
state and crashes, we don't fully know the client_state
,
so the terminate_state
callback needs to take such scenarios
into account.
This callback is optional.
Link to this section Functions
Checks out from the pool.
It expects a command, which will be passed to the handle_checkout/4
callback. The handle_checkout/4
callback will return a client state,
which is given to the function
.
The function
receives two arguments, the pool {pid(), reference()}
and the client_state
.
The function must return a two-element tuple, where the first element is the
return value for checkout!
, and the second element is the updated client_state
,
which will be given as the first argument to handle_checkin/4
.
checkout!
also has an optional timeout
value, this value will be applied
to checkout operation itself. checkin
happens asynchronously.
Defines a pool to be started under the supervision tree.
It accepts the same options as start_link/1
with the
addition or :restart
and :shutdown
that control the
"Child Specification".
Starts a pool.
options
Options
:worker
- a{worker_mod, worker_init_arg}
tuple with the worker module that implements theNimblePool
behaviour and the worker initial argument. This argument is required.:pool_size
- how many workers in the pool. Defaults to 10.:lazy
- Whentrue
, workers are started lazily, only when necessary. Defaults tofalse
.:worker_idle_timeout
- Timeout in milliseconds to tag a worker as idle. If not nil, starts a periodic timer on the same frequency that will ping all idle workers usinghandle_ping/2
optional callback . Defaults to no timeout.:max_idle_pings
- Defines a limit to the number of workers that can be pinged for each cycle of thehandle_ping/2
optional callback. Defaults to no limit. Seehandle_ping/2
for more details.
Stops a pool.
Sends an update
instruction to the pool about the checked out worker.
This must be called inside the checkout!
callback with
the from
value given to checkout
.
This is useful to update the pool state before effectively checking the state in, which is handy when transferring resources that requires two steps.