Conn.Pool v0.2.1 Conn protocol
High level abstraction that represents connection in the most common sense.
Implementation can use any transport to connect to any remote or VM-local
resource: Agent
, GenServer
, different http-APIs, etc.
Callbacks
init/2
— initialize connection with given arguments;resource/1
— resource for that interaction is made;methods!/1
— methods supported while makingConn.call/3
;call/3
— interact via selected method;parse/2
— parse data in respect of connection context.
Example
Let’s implement simple generic text-base RPC protocol. Client can send server
requests in the form of :COMMAND
or :COMMAND:ARGS
to execute COMMAND
with given ARGS
or without them. There is a special request :COMMANDS
to
ask for a list of all available commands, response for it is ok:COMMAND1,CMD2,…
.
Server may reply with ok:COMMAND1,CMD2,…
, ok
, ok:DATA
or err:REASON
,
where REASON
could be notsupported
(command), badarg
, parse
, or
arbitrary string.
Now simple test conn will look like:
defmodule TextConn, do: defstruct [:res]
Let’s implement protocol described above:
defimpl Conn, for: TextConn do
def init(conn, server) do
if Process.alive?(server) do
{:ok, %TextConn{res: server}}
else
{:error, :dead, :closed, conn}
end
end
def resource(%_{res: server}), do: server
def methods!(%_{res: server}=conn) do
send(server, {self(), ":COMMANDS"})
receive do
"ok:" <> cmds ->
String.split(",")
end
end
def call(%_{res: server} = conn, cmd, args \\ nil) when is_binary(args) do
if Process.alive?(server) do
send(server, {self(), args && ":#{cmd}:#{args}" || ":#{cmd}"})
receive do
"ok" ->
{:noreply, conn}
"ok:" <> reply ->
{:reply, reply, conn}
"err:notsupported" ->
{:error, :notsupported, conn}
"err:"<>reason ->
# Suggest 50 ms timeout.
{:error, reason, 50, conn}
after
5000 ->
{:error, :timeout, conn}
end
else
{:error, :closed}
end
end
— it’s the part of the implementation used on the client side. Let’s move to parsing:
def parse(conn, ""), do: :ok
def parse(conn, ":COMMANDS" <> _), do: {:ok, :methods, ""}
def parse(conn, ":" <> data) do
case Regex.named_captures(~r[(?<cmd>[^:]+)(?::(?<args>.*))?], data) do
%{"cmd" => cmd, "args" => args} ->
{:ok, {:call, cmd, args}, ""}
_ ->
{:error, {:parse, data}, ""}
end
end
def parse(conn, malformed) do
{:error, {:parse, malformed}, ""}
end
end
That’s it. Now all that left is to spawn corresponding server that will
interact with client using above protocol. Let us implement server that holds
integer value and supports commands INC
, GET
and STOP
.
iex> defmodule Server do
...> def loop(state \\ 0) do
...> receive do
...> {client, data} ->
...> case Conn.parse(%TextConn{}, data) do
...> {:error, {:parse, _}, _rest} ->
...> send(client, "err:parse")
...> loop(state)
...>
...> {:ok, :methods, _rest} ->
...> send(client, "ok:INC,GET,STOP")
...> loop(state)
...>
...> {:ok, {:call, "INC", ""}, _rest} ->
...> send(client, "ok:#{state+1}")
...> loop(state+1)
...>
...> {:ok, {:call, "GET", ""}, _rest} ->
...> send(client, "ok:#{state}")
...> loop(state)
...>
...> {:ok, {:call, "STOP", ""}, _rest} ->
...> send(client, "ok")
...>
...> {:ok, {:call, _, ""}, _rest} ->
...> send(client, "err:notsupported")
...> loop(state)
...>
...> {:ok, {:call, _, _}, _rest} ->
...> send(client, "err:badarg")
...> loop(state)
...> end
...> end
...> end
...> end
...>
...>
iex> res = spawn_link(&Server.loop/0)
iex> {:ok, pool} = Conn.Pool.start_link()
iex> Conn.Pool.init(pool, %TextConn{}, res)
iex> Conn.Pool.call(pool, res, "GET")
{:ok, "0"}
iex> Conn.Pool.call(pool, res, "INC")
{:ok, "1"}
iex> Conn.Pool.call(pool, res, "DEC")
{:error, :method}
iex> Conn.Pool.call(pool, res, "INC", :badarg)
{:error, "badarg"}
iex> Conn.Pool.call(pool, res, "STOP")
:ok
iex> Conn.Pool.call(pool, res, "GET")
{:error, :resource}
TextConn
could be used to connect to any server that implements above
protocol.
Link to this section Summary
Functions
Interacts using given connection, method and data
Initialize connection, args could be provided
Methods of interactions available for connection. Can be http-methods: :get
,
:put
, :post
, etc., or user defined arbitrary terms :info
, :say
,
:ask
, {:method, 1}
and so on. They represents types of interactions can be
done using given connection
Parse data in context of given connection
Resource is an arbitrary term (for ex., some pid). Connection represents interaction with resource
Link to this section Types
info() :: %Conn{ closed: boolean(), conn: any(), extra: any() | nil, init_args: any() | nil, last_call: pos_integer() | :never, last_init: pos_integer() | :never, methods: [method()], revive: boolean() | :force, stats: %{required(method()) => {non_neg_integer(), pos_integer()}}, timeout: timeout(), ttl: pos_integer() | :infinity, unsafe: boolean() }
Link to this section Functions
call(Conn.t(), Conn.method(), any()) :: {:noreply, Conn.t()} | {:noreply, timeout(), Conn.t()} | {:reply, reply(), Conn.t()} | {:reply, reply(), timeout(), Conn.t()} | {:error, :closed} | {:error, :timeout | :notsupported | reason(), Conn.t()} | {:error, :timeout | :notsupported | reason(), timeout() | :closed, Conn.t()}
Interacts using given connection, method and data.
If call succeed, returns
{:noreply, conn}
— call could be repeated immediately;{:noreply, timeout, conn}
— call could be repeated aftertimeout
ms;{:noreply, :infinity | :closed, conn}
— call should not be repeated as it will return{:error, :closed}
until reopen happend (viainit/2
);{:reply, reply, conn}
, wherereply
is the response data and call could be repeated immediately;{:reply, reply, timeout, conn}
when suggested to repeat call no sooner thantimeout
ms;{:reply, reply, :infinity | :closed, conn}
wheninit/2
should be called prior to any other interactions.
If call fails, returns
{:error, :closed}
— because connection is closed (useinit/2
to reopen);{:error, :timeout, conn}
— because call took too long;{:error, :notsupported, conn}
— because method is not supported;{:error, reason, conn}
wherereason
is an arbitrary term and call could be repeated immediately;{:error, reason, timeout, conn}
suggested to repeat call no sooner thantimeout
ms;{:error, reason, :infinity | :closed, conn}
when call should be repeated only after connection is reopened withinit/2
.
Examples
iex> {:ok, conn} = Conn.init(%Conn.Agent{}, fn -> 42 end)
iex> {:reply, 42, ^conn} = Conn.call(conn, :get, & &1)
iex> {:noreply, :closed, ^conn} = Conn.call(conn, :stop)
iex> Conn.call(conn, :get, & &1)
{:error, :closed}
Initialize connection, args could be provided.
In case of succeed, returns
{:ok, conn}
, whereconn
could be used immediately;{:ok, timeout, conn}
, where suggested to waittimeout
ms before useconn
.
If initialization fails, returns
{:error, :timeout, conn}
when call took too long, and it is suggested to repeat it immediately;{:error, reason, conn}
when suggested to repeat call immediately;{:error, reason, timeout, conn}
when suggested to repeatinit/2
call aftertimeout
ms.
Examples
iex> {:ok, conn1} = Conn.init(%Conn.Agent{}, fn -> 42 end)
iex> {:ok, conn2} = Conn.init(%Conn.Agent{}, res: Conn.resource(conn1))
iex> Conn.resource(conn1) == Conn.resource(conn2)
true
Methods of interactions available for connection. Can be http-methods: :get
,
:put
, :post
, etc., or user defined arbitrary terms :info
, :say
,
:ask
, {:method, 1}
and so on. They represents types of interactions can be
done using given connection.
Returns
or raise.
Examples
iex> Conn.methods!(%Conn.Agent{})
[:get, :get_and_update, :update, :stop]
Parse data in context of given connection.
Returns
On success:
{:ok, {:call, method, payload}, rest of the data}
;{:ok, :methods, rest of the data}
.
On error:
{:error, :notimplemented}
if parse is not implemented;{:error, :needmoredata}
if parsing needs more data;{:error, {:parse, data that is malformed}, rest of the data}
;{:error, reason}
for any other error while parsing.
If parse/2
function will not be used return {:error, :notimplemented}
.
See example in the head of the module docs.
Resource is an arbitrary term (for ex., some pid). Connection represents interaction with resource.
Examples
iex> {:ok, conn1} = Conn.init(%Conn.Agent{}, fn -> 42 end)
iex> {:ok, conn2} = Conn.init(%Conn.Agent{}, res: Conn.resource(conn1))
iex> Conn.resource(conn1) == Conn.resource(conn2)
true
iex> {:ok, agent} = Agent.start_link(fn -> 42 end)
iex> {:ok, conn1} = Conn.init(%Conn.Agent{}, res: agent)
iex> {:ok, conn2} = Conn.init(%Conn.Agent{}, res: agent)
iex> Conn.resource(conn1) == Conn.resource(conn2)
true