MeshxRpc (MeshxRpc v0.1.1) View Source
RPC client and server.
MeshxRpc can be considered as an alternative to Erlang OTP :erpc or :rpc modules.
Major differences between MeshxRpc and native :erpc modules:
MeshxRpcis using custom binary communication protocol, hence it doesn't depend on Erlang distribution and can be used with any user provided TCP transport solution, especially service mesh data plane,MeshxRpcrestricts incoming request function executability scope to single module associated with given RPC server. This way user has full control over RPC server functionality exposed to remote RPC clients, instead of allowing unrestricted access to remote as it is a case for OTP:erpcmodule.
MeshxRpc features:
- connection pooling,
- user customizable serialization functions,
- traffic chunking into smaller blocks of configurable maximum size to avoid IO socket blocking,
- optional transmission error detection with user configurable asynchronously executed message block checksum functions,
- reach per request telemetry metrics,
- request integrity protection with sequence numbers and node/connection references,
- primitive access control with optional preshared keys,
- load balancing and high availability provided by service mesh data plane,
- ability to run multiple RPC servers and/or clients on a single node.
MeshxRpc is designed to work primarily as mesh service and doesn't offer data encryption, authorization or authentication mechanisms as those are natively provided by service mesh environment.
Installation
Add :meshx_rpc and optionally :meshx_consul, :ranch and :poolboy to application dependencies:
# mix.exs
def deps do
[
{:meshx_rpc, "~> 0.1.0"},
# if using :meshx_consul service mesh adapter add:
{:meshx_consul, "~> 0.1.0"},
# if running RPC client(s) add:
{:poolboy, "~> 1.5"},
# if running RPC server(s) using ranch 1.8.0 add:
{:ranch, "~> 1.8.0"}
# if running RPC server(s) using ranch 2.0 add:
{:ranch, "~> 2.0", override: true}
]
endUsage
Same mix project is used to implement both RPC server and RPC client to simplify examples.
Server and client nodes are started using custom command line argument rpc_server?:
- start server:
iex --erl "-rpc_server? true", - start client:
iex --erl "-rpc_server? false".
Example 1. Shared Unix Domain Socket.
Requires ranch version 2.0.
Implement RPC server Example1.Server and client Example1.Client modules:
# lib/server.ex
defmodule Example1.Server do
use MeshxRpc.Server,
address: {:uds, "/tmp/meshx.sock"},
telemetry_prefix: [:example1, __MODULE__]
def echo(args), do: args
def ping(_args), do: :pong
end
# lib/client.ex
defmodule Example1.Client do
use MeshxRpc.Client,
address: {:uds, "/tmp/meshx.sock"},
telemetry_prefix: [:example1, __MODULE__],
pool_opts: [size: 1, max_overflow: 0]
def echo(args), do: call(:echo, args)
endBoth client and server connect to the same UDS socket at "/tmp/meshx.sock".
For RPC client number of pool workers is limited to one, to reduce amount of telemetry events logged to terminal.
RPC client and server are started using application supervision tree by using their respective child specifications Example1.Client.child_spec() and Example1.Server.child_spec():
# lib/example1/application.ex
defmodule Example1.Application do
use Application
@impl true
def start(_type, _args) do
children =
if rpc_server?() do
MeshxRpc.attach_telemetry([:example1, Example1.Server])
[Example1.Server.child_spec()]
else
MeshxRpc.attach_telemetry([:example1, Example1.Client])
[Example1.Client.child_spec()]
end
Supervisor.start_link(children, strategy: :one_for_one, name: Example1.Supervisor)
end
defp rpc_server?() do
case :init.get_argument(:rpc_server?) do
{:ok, [['true']]} -> true
_ -> false
end
end
endLaunch two terminals, start RPC server node in first terminal:
iex --erl "-start_epmd false" --erl "-rpc_server? true" -S mixStart RPC client node in second terminal:
iex --erl "-start_epmd false" --erl "-rpc_server? false" -S mixIn both terminals similar telemetry event should be logged, here client side:
# [:example1, Example1.Client, :hsk] -> :ok
# local: %{conn_ref: "VPRihQ", node_ref: "nonode@nohost", svc_ref: "Elixir.Example1.Client"}
# remote: %{conn_ref: "wfyMQQ", node_ref: "nonode@nohost", svc_ref: "Elixir.Example1.Server"}
# ...First line says that successful handshake (:hsk) was executed by [:example1, Example1.Client]. Next two log lines describe local and remote endpoints. Please check MeshxRpc.attach_telemetry/2 for details.
Execute RPC requests in client terminal:
# telemetry events removed
iex(1)> Example1.Client.call(:ping)
:pong
iex(2)> Example1.Client.echo("hello world")
"hello world"
iex(3)> Example1.Client.cast(:echo, "hello world")
:okWhen using OTP :erpc module it is possible to execute arbitrary function call on remote node, for example: :erpc.call(:other_node, File, :rm, ["/1/etc/*.remove_all"]). As mentioned earlier request execution scope in MeshxRpc is limited to single server module, here Example1.Server exposes only ping/1 and echo/1 functions. Requesting execution of not implemented (or not allowed) remote function will result in error:
iex(3)> Example1.Client.call(:rm, ["/1/etc/*.remove_all"])
{:error_rpc, {:undef, [...]}}Possible extension to this example could be: run RPC server and client nodes on different hosts and connect UDS sockets using ssh port forwarding.
Example 2. Service mesh using MeshxConsul.
Please check MeshxConsul package documentation for additional requirements and configuration steps necessary when using Consul service mesh adapter.
Starting user service providers and upstream clients connected to service mesh requires interaction with Consul used as external service mesh application. Running external API calls that can block, during supervision tree initialization is considered as bad practice. To start mesh service provider and mesh upstream client asynchronously, additional DynamicSupervisor will be created.
First implement RPC server Example2.Server and client Example2.Client modules:
# lib/server.ex
defmodule Example2.Server do
use MeshxRpc.Server,
telemetry_prefix: [:example2, __MODULE__]
def echo(args), do: args
def ping(_args), do: :pong
end
# lib/client.ex
defmodule Example2.Client do
use MeshxRpc.Client,
telemetry_prefix: [:example2, __MODULE__],
pool_opts: [size: 1, max_overflow: 0]
def echo(args), do: call(:echo, args)
endDynamic supervisor managing mesh services is named Example2.MeshSupervisor and started using application supervisor:
# lib/example2/application.ex
defmodule Example2.Application do
use Application
@impl true
def start(_type, _args) do
Supervisor.start_link([Example2.MeshSupervisor],
strategy: :one_for_one,
name: Example2.Supervisor
)
end
endDynamic supervisor Example2.MeshSupervisor will asynchronously start both RPC server and client depending on --erl "-rpc_server? true/false" command line argument:
# lib/mesh_supervisor.ex
defmodule Example2.MeshSupervisor do
use DynamicSupervisor
def start_link(init_arg),
do: DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
@impl true
def init(_init_arg) do
spawn(__MODULE__, :start_mesh, [])
DynamicSupervisor.init(strategy: :one_for_one)
end
def start_mesh() do
child =
if rpc_server?() do
MeshxRpc.attach_telemetry([:example2, Example2.Server])
{:ok, _id, address} = MeshxConsul.start("service-1")
Example2.Server.child_spec(address: address)
else
MeshxRpc.attach_telemetry([:example2, Example2.Client])
{:ok, [{:ok, address}]} = MeshxConsul.connect(["service-1"])
Example2.Client.child_spec(address: address)
end
DynamicSupervisor.start_child(__MODULE__, child)
end
defp rpc_server?() do
case :init.get_argument(:rpc_server?) do
{:ok, [['true']]} -> true
_ -> false
end
end
endCommands to start server and client nodes are same as in Example 1.
When RPC server node is started mesh service endpoint address is prepared by running MeshxConsul.start("service-1") function:
- mesh service
"service-1"is registered with Consul service registry, - required by service
"service-1"workers are stared, including sidecar proxy connecting"service-1"with Consul managed mesh data plane.
Next DynamicSupervisor starts child specified with Example2.Server.child_spec(address: address). Started child worker is a user defined (RPC) server attached to mesh service endpoint address, hence it becomes user mesh service provider that can be:
- accessed by remote mesh upstream clients,
- managed using Consul service mesh control plane.
When RPC client node is started mesh upstream endpoint address is prepared by MeshxConsul.connect("service-1"):
- special "proxy service" is registered with Consul, default service name is prefix
"upstream-"concatenated with host name; in this example it is"upstream-h11", - upstream
"service-1"is added to sidecar proxy service"upstream-h11", - required by proxy service
"upstream-h11"workers are stared, including sidecar proxy connecting"upstream-h11"with mesh data plane.
Next DynamicSupervisor starts child specified with Example2.Client.child_spec(address: address). Started child worker is a user defined (RPC) client attached to mesh upstream endpoint address, hence it becomes user mesh upstream client connected to service mesh data plane.
Consul UI screenshot showing connection between upstream-h11 proxy service and service-1:

Common configuration
RPC client and server modules provide child specifications which should be used with user supervisors as shown on examples above. RPC client child_spec can be created directly by accessing MeshxRpc.Client.Pool.child_spec/2 or by using wrapper module MeshxRpc.Client. Similarly RPC server child_spec is available through MeshxRpc.Server.Pool.child_spec/2 or MeshxRpc.Server modules.
Configuration options common to client and server child_spec/2 functions:
:pool_opts- worker pool options for RPC client and server. Server pool is build usingranchsocket acceptor pool.pool_optscorrespond in this case toranchtransport options. Client pool is build usingpoolboy,pool_optscorrespond here directly topoolboyoptions. Users usually should not set anypool_optsfor server pools. Options that can be set for RPC client pools::size- maximum pool size, default 10,:max_overflow- maximum number of workers created if pool is empty,:strategy-:lifoor:fifo, determines whether checked in workers should be placed first or last in the line of available workers. So,:lifooperates like a traditional stack;:fifolike a queue. Default is:lifo.- The default value is
[].
- The default value is
:address- Required. Address on which server will listen for requests or client will send requests to. Two address types are supported:- ethernet interface address:
{:tcp, ip, port}, whereipis defined as tuple (e.g.{127, 0, 0, 1}) andportis integer in the range1..65535, - Unix Domain Socket address:
{:uds, path}, where socketpathis defined as string, e.g.{:uds, "/tmp/beam.sock"}
- ethernet interface address:
:blk_max_size- user request function arguments are first serialized into binary using function specified with:serialize_mfaoption. Binary is then split into smaller blocks send over the wire. Option defines maximum send block byte size. Must be between 200 bytes and 256 MB (200..268_435_456). The default value is16384.:cks_mfa- block checksum function{module, function, argument}. 2-arity function accepting: 1st argument - binary data to calculate checksum, 2nd argument - as set in optionargument. Function result should be calculated checksum ofbinary()type. Example checksum implementation isMeshxRpc.Protocol.Default.checksum/2, using:erlang.crc32/1. To use this function set:[cks_mfa: {MeshxRpc.Protocol.Default, :checksum, []}]. If option is left undefined checksums are not calculated. The default value isnil.:conn_ref_mfa-{module, function, argument}function generating connection reference, 0-arity. The default value is{MeshxRpc.Protocol.Default, :conn_ref, []}.:deserialize_mfa-{module, function, argument}function used to de-serialize request argument, 3-arity. Function should reverse data serialization process executed by function specified by:serialize_mfa. First function argument is binary data to de-serialize, second is optionargument, third is serialization flag generated by:serialize_mfa. Function should return{:ok, deserialized}if successful,{:error, reason}otherwise. The default value is{MeshxRpc.Protocol.Default, :deserialize, []}.:hsk_dgt_mfa-{module, function, argument}function used to calculate digest in connection handshake, 2-arity. First argument is binary data for which digest should be calculated, second is optionargument. The default value is{MeshxRpc.Protocol.Default, :checksum, []}.:node_ref_mfa-{module, function, argument}function generating request node reference, 0-arity. The default value is{MeshxRpc.Protocol.Default, :node_ref, []}.:quiet_on_hsk_error?- when connection is established between RPC server and client first step is a two-way handshake. If handshake fails remote node can be notified about failure or failure can be silently discarded. The default value isfalse.:serialize_mfa-{module, function, argument}function used to serialize user request function argument(s), 2-arity. First argument is erlang term that requires serialization, second is optionargument. Function should return if successful{:ok, serialized_binary, serialization_flag}.serialization_flagshould be one byte integer (0..255). It states how data should be de-serialized and will be passed as third argument to function specified in:deserialize_mfa. In case of error function should return{:error, reason}. The default value is{MeshxRpc.Protocol.Default, :serialize, []}.:shared_key- shared key must be same for connecting server and client. Can be used to specify for example API version:shared_key: "api_ver_01". The default value is"".:socket_opts- connection socket options. Check:inet.setopts/2for available settings. The default value is[].:svc_ref_mfa-{module, function, argument}function generating request service reference, 0-arity. If not defined, by default service reference will be calculated from:service_id |> to_string() |> String.slice(0..255).:telemetry_prefix- specifies prefix used when executing telemetry events. If not defined package name and service module will be used, e.g.:[:meshx_rpc, MyApp.Rpc.Server1].:timeout_hsk- handshake timeout. The default value is5000.:timeout_cks- timeout for function calculating block checksums specified by:cks_mfa. The default value is500.:gen_statem_opts- option passed as third argument to:gen_statem.start_link/3when starting RPC server or client pool worker. The default value is[].
Telemetry
Telemetry events
Telemetry event prefix is defined with :telemetry_prefix configuration option.
Events generated by MeshxRpc:
:init- emitted only by server when server worker cannot establish socket connection with user providedaddressterminated with transport solution (e.g. service mesh sidecar proxy),:hsk- emitted by both server and client during connection handshake phase,:idle- emitted only by client workers when worker is in idle state waiting for user requests,:recvand:send- emitted by both client and server workers if there was a problem when receiving or sending request data,:calland:cast- emitted by client and server during failed or after successful call/cast request processing.
Telemetry metadata
:address- connection address, e.g.{:tcp, {127, 0, 0, 1}, 1024},:fun_name- request function name, e.g.:echo,:fun_req- request function type, can be:callor:cast,:hsk_ref- handshake reference,integer(),:id- RPC server or client id, e.g.Example2.Client,:local- map describing local endpoint using keys:conn_ref,node_refandsvc_ref.:remote- as:localbut for remote endpoint,:req_ref- request reference,integer(),:result- execution result. If request execution was successful:resultis set to atom:ok, real execution results are not emitted by telemetry. If execution failed, error reason is emitted,:socket- socket port used in connection, e.g.#Port<0.19>,:state- worker:gen_statemlast state, e.g.:reply.
Example request telemetry metadata:
%{
address: {:tcp, {127, 0, 0, 1}, 65535},
fun_name: :echo,
fun_req: :cast,
hsk_ref: 3490,
id: Example2.Client,
local: %{
conn_ref: <<123, 219, 9, 168>>,
node_ref: "nonode@nohost",
svc_ref: "Elixir.Example2.Client"
},
remote: %{
conn_ref: <<66, 9, 108, 5>>,
node_ref: "nonode@nohost",
svc_ref: "Elixir.Example2.Server"
},
req_ref: 3650,
result: :ok,
socket: #Port<0.12863>,
state: :reply
}Telemetry metrics
Three metrics types are reported:
:blocks- reports number of blocks, send and received,:size- number of bytes, send and received,:time- approximate time in microseconds spend on consecutive request processing steps.
:time metrics:
:serand:dser- serialization and de-serialization time,:exec- request function execution time,:hsk- handshake time,:idle- worker idle time,:recvand:send- time spent on request data receiving and sending.
Example telemetry metrics:
%{
blocks: %{recv: 1, send: 1},
size: %{recv: 14, send: 40},
time: [
dser: 0,
exec: 1038,
hsk: 0,
idle: 101101,
recv: 12,
send: 102,
ser: 1
]
}
Link to this section Summary
Functions
Attaches pretty-printing Logger handler to telemetry events.
Link to this section Functions
Specs
Attaches pretty-printing Logger handler to telemetry events.
First argument should correspond to :telemetry_prefix configuration option described earlier. Second argument is telemetry handler id. If handler id is undefined it will be assigned value equal to second list element in telemetry_prefix.
Errors are logged with :error Logger level, all other events are logged with :debug level.
Example log of :ping call request:
Example2.Client.call(:ping)
12:17:11.869 [debug]
[:example2, Example2.Client, :call, :ping] -> :ok
local: %{conn_ref: "e9sJqA", node_ref: "nonode@nohost", svc_ref: "Elixir.Example2.Client"}
remote: %{conn_ref: "QglsBQ", node_ref: "nonode@nohost", svc_ref: "Elixir.Example2.Server"}
address: {:tcp, {127, 0, 0, 1}, 65535}
meta: [hsk_ref: 4034, req_ref: 4066, socket: #Port<0.14455>, state: :reply]
t_req: 2.152 [dser: 0.006, exec: 2.002, recv: 0.036, send: 0.105, ser: 0.003]
t_idle: 17547.272
size: [recv: "31B", send: "31B"]
blocks: [recv: 1, send: 1]t_req is a total request time followed by [individual request steps times], milliseconds.
t_idle is a worker idle time, milliseconds.
attach_telemetry/2 is created as helper for use during development phase, most probably should not be used in production.