View Source Amoc
Mix.install([
{:ranch, "~> 2.1"},
{:jiffy, "~> 1.1"},
{:gen_state_machine, "~> 2.0"},
{:uuid, "~> 1.1"},
{:amoc, "~> 3.0"},
{:kino, "~> 0.11.3"},
{:vega_lite, "~> 0.1.6"},
{:kino_vega_lite, "~> 0.1.10"},
{:kino_explorer, "~> 0.1.11"}
])
Getting Started
In this book, we're going to experiment with some barebones load testing cases. We will define a server, a scenario, some instrumentation, and we will load-test the server and get some results.
Let's get started!
The Server
For this demo, we will need something to load-test to begin with. So let's create a very basic super private messaging server. The server is a barebones message-router and nothing more, and it works with JSON payloads, we will define the protocol as follows:
- On user connect, the user provides
%{<<"connect">> => user_id}
whereuser_id
is any binary, for example a UUIDv4 or something even more impossible to guess:- The server checks only that no other user is connected with this
user_id
:- If the
user_id
is free, accept the connection; - If the
user_id
is taken, then it closes the connection;
- If the
- The server checks only that no other user is connected with this
- On a connected session, the server takes any JSON payload, provided it has two mandatory fields,
from
andto
:- The server checks only that the specified
from
matches the one the connection was establised with:- If it matches, then it routes the entire payload to
to
, transparently; - If it doesn't, then it does not route the message knowing it was a pishing attemp;
- If it matches, then it routes the entire payload to
- Note that in any case the server tell anything to the client about whether the peer was connected or the pishing attempt, to avoid leaking any knowledge about the intention of the payload. It is the responsibility of the peer, if wants to, to react to the message.
- The server checks only that the specified
How does the routing work? Simply, it delivers the payload to the recipient, if a recipient is connected, otherwise it does nothing. As the recipient's user_id
is arbitrarily set by the recipient, the sender can only write to the recipient if it already knows his user_id
.
See the code below!
The Session Manager
defmodule MessagingServer.SM do
@moduledoc """
This is a simple wrapper around an ETS table that stores {user_id, pid} pairs,
in order to be able to deliver messages from one user_id to another.
"""
use GenServer
@table __MODULE__
@type user_id() :: term()
@doc "Puts a _new_ user in the session table"
def put_new(user_id, pid) do
:ets.insert_new(@table, {user_id, pid})
end
@doc "Gets the pid of a user by its ID, if the ID is registered"
def get(user_id) do
case :ets.lookup_element(@table, user_id, 2, {:error, :not_found}) do
{:error, :not_found} -> {:error, :not_found}
pid -> pid
end
end
@doc "Removes a user from the session table"
def delete(user_id) do
:ets.delete(@table, user_id)
end
def start_link() do
GenServer.start_link(__MODULE__, :no_args, [])
end
def init(_) do
:ets.new(@table, [
:named_table,
:public,
:set,
write_concurrency: true,
read_concurrency: true
])
{:ok, :no_state}
end
end
The C2S code
This is what implements the protocol, as a very simple state machine
defmodule MessagingServer.C2S do
@moduledoc """
Implements the server-side protocol
"""
use GenStateMachine, callback_mode: :handle_event_function
require Record
require Logger
Record.defrecord(:data, ref: nil, transport: nil, socket: nil)
@type state() :: :connecting | {:connected, term()}
def start_link(ref, transport, opts) do
GenStateMachine.start_link(__MODULE__, {ref, transport, opts}, [])
end
def init({ref, transport, _}) do
do_connect = {:next_event, :internal, {:do_connect, ref}}
{:ok, :connecting, data(ref: ref, transport: transport), do_connect}
end
## get ranch handshake and socket
def handle_event(:internal, {:do_connect, ref}, :connecting, data) do
{:ok, socket} = :ranch.handshake(ref)
full_data = data(data, socket: socket)
active_once(full_data)
{:next_state, :connecting, full_data}
end
## From device to server
## Connecting a session
def handle_event(:internal, %{"connect" => user_id}, :connecting, data) do
case MessagingServer.SM.put_new(user_id, self()) do
true ->
payload = %{"connected" => user_id}
transport_send(data, payload)
{:next_state, {:connected, user_id}, data}
false ->
payload = %{"rejected" => "duplicate"}
transport_send(data, payload)
{:stop, :rejected_connection_because_of_duplicate}
end
end
## Sending messages
def handle_event(:internal, payload, {:connected, user_id}, _) do
case payload do
## user_id =:= from -> no pishing
%{"from" => ^user_id, "to" => to} ->
case MessagingServer.SM.get(to) do
{:error, :not_found} ->
:keep_state_and_data
pid ->
send(pid, {:route, payload})
:keep_state_and_data
end
_ ->
:keep_state_and_data
end
end
## From peer to peer
## Receiving messages
def handle_event(:info, {:route, payload}, {:connected, user_id}, data) do
case payload do
## user_id =:= to
%{"to" => ^user_id} ->
transport_send(data, payload)
:keep_state_and_data
## message wasnt to me, ignore without revealing who wrote to me
_ ->
:keep_state_and_data
end
end
## TCP related handlers
def handle_event(:info, {:tcp_closed, socket}, _, data(socket: socket)) do
{:stop, :normal}
end
def handle_event(:info, {:tcp_error, socket, reason}, _, data(socket: socket)) do
{:stop, reason}
end
def handle_event(:info, {:tcp, socket, packet}, _, data = data(socket: socket)) do
case decode_all(packet) do
:bad_json_received ->
{:stop, :bad_json_received}
events ->
active_once(data)
{:keep_state_and_data, events}
end
end
## Match-all ignore-rest
def handle_event(event_type, event_content, _state, _data) do
Process.sleep(500)
Logger.alert("we got something")
Logger.alert(%{type: event_type, content: event_content, where: 105})
:keep_state_and_data
end
def terminate(reason, state, d = data(socket: socket, transport: transport))
when socket != nil and transport != nil do
try do
apply(transport, :close, [socket])
catch
_ -> :ok
end
terminate(reason, state, data(d, socket: nil, transport: nil))
end
def terminate(reason, {:connected, user_id}, data) do
MessagingServer.SM.delete(user_id)
terminate(reason, :connecting, data)
end
def terminate(_, _, _) do
:ok
end
defp active_once(data(transport: transport, socket: socket)) do
:ok = transport.setopts(socket, active: :once)
end
defp transport_send(data(transport: transport, socket: socket), payload) do
packet = :jiffy.encode(payload)
transport.send(socket, packet)
end
defp decode_all(packet, events \\ []) do
try do
case :jiffy.decode(packet, [:return_maps, :return_trailer]) do
{:has_trailer, payload, trailer} ->
payload_event = {:next_event, :internal, payload}
decode_all(trailer, [payload_event | events])
payload ->
payload_event = {:next_event, :internal, payload}
Enum.reverse([payload_event | events])
end
catch
_, _ -> :bad_json_received
end
end
end
The Supervisor and Application modules
defmodule MessagingServer.Sup do
use Supervisor
def start_link(_) do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end
def init([]) do
ranch =
:ranch.child_spec(
MessagingServer,
:ranch_tcp,
%{socket_opts: [port: 8888]},
MessagingServer.C2S,
[]
)
sm = %{id: MessagingServer.SM, start: {MessagingServer.SM, :start_link, []}}
supFlags = %{strategy: :one_for_all, intensity: 0, period: 1}
childSpecs = [sm, ranch]
{:ok, {supFlags, childSpecs}}
end
end
defmodule MessagingServer do
use Application
def start(_type, _args) do
children = [MessagingServer.Sup]
Supervisor.start_link(children, strategy: :one_for_one)
end
end
MessagingServer.start(:normal, [])
The Scenario
Developing a scenario
TL;DR: https://hexdocs.pm/amoc/scenario.html
Now, say you've read the official guide, now you know we need two special callbacks, init/0
and start/1,2
. Let's separate them into different modules, for readability.
The user process
We first define a simple struct that will hold the permanent data for the connection, like socket and config state. Then, we define a state machine that knows how to send messages. After a connection is established:
- Every ~1 second it will send a message of 20-30 characters to some randomly selected neighbour
- If it receives a message, it adds this message to the list of pending messages to send and continues going through its ~1 second loop
- The user will have capped the maximum number of messages that it will send to ~20..30
- When it can't send more messages nor it has received any for long enough, ~3 seconds, it will die
defmodule LoadTest.User.Data do
defstruct([:id, :neighbours, :socket])
end
defmodule LoadTest.User do
alias LoadTest.User.Data, as: Data
use GenStateMachine, callback_mode: :handle_event_function
defstruct([:id, :neighbours, :socket])
def init(_) do
raise "This should be unreachable"
end
def handle_event(:info, {:tcp_error, socket, reason}, _, %Data{socket: socket}) do
{:stop, reason}
end
def handle_event(:info, {:tcp, socket, packet}, _, %Data{socket: socket}) do
case decode_all(packet) do
:bad_json_received ->
{:stop, :bad_json_received}
events ->
{:keep_state_and_data, events}
end
end
def handle_event(:info, {:EXIT, _pid, reason}, _, _) do
{:stop, reason}
end
## Connecting a session
def handle_event(:internal, :do_connect, :connecting, data = %Data{id: my_id}) do
payload = %{"connect" => my_id}
transport_send(data, payload)
{:next_state, :waiting_for_connection_ack, data}
end
def handle_event(
:internal,
%{"connected" => my_id},
:waiting_for_connection_ack,
%Data{id: my_id, neighbours: neighbours} = data
) do
left = num_of_messages()
pending = build_pendings(my_id, neighbours)
schedule = [{:next_event, {:timeout, :time_to_send_one}, :time_to_send_one}]
{:next_state, {:connected, left, pending}, data, schedule}
end
## This is it, it really can't send any more messages, nor has it received any, DIE!
def handle_event(:timeout, :inactive_long_enough, {:connected, 0, _}, _) do
:stop
end
## It is still receiving messages, act like you stay a bit longer to read them but then die
def handle_event(_, _, {:connected, 0, _}, _) do
schedule = [{:timeout, time_to_die_if_inactive(), :inactive_long_enough}]
{:keep_state_and_data, schedule}
end
## Nothing has really happened in a while, disconnect the user
def handle_event(:timeout, :inactive_long_enough, _, _) do
:stop
end
## Time to send a message
def handle_event(
{:timeout, :time_to_send_one},
:time_to_send_one,
{:connected, left, pending},
data
) do
send_now = Enum.random(pending)
new_pending = List.delete(pending, send_now)
transport_send(data, send_now)
case {left, Enum.count(new_pending)} do
{1, _} ->
schedule = [{:timeout, time_to_die_if_inactive(), :inactive_long_enough}]
{:next_state, {:connected, 0, []}, data, schedule}
{_, 0} ->
schedule = [{:timeout, time_to_die_if_inactive(), :inactive_long_enough}]
{:next_state, {:connected, left - 1, new_pending}, data, schedule}
_ ->
schedule = [{{:timeout, :time_to_send_one}, time_to_respond(), :time_to_send_one}]
{:next_state, {:connected, left - 1, new_pending}, data, schedule}
end
end
## I received a message, note that we match the Data.id with the "to" field
def handle_event(
:internal,
%{"to" => my_id} = payload,
{:connected, left, pending},
%Data{id: my_id} = data
) do
new_pending = [build_answer(my_id, payload) | pending]
case Enum.count(pending) do
## then we know we didn't have anything schedule, do so now
0 ->
schedule = [{{:timeout, :time_to_send_one}, time_to_respond(), :time_to_send_one}]
{:next_state, {:connected, left, new_pending}, data, schedule}
## So something else is coming already, this only append to pending
_ ->
{:next_state, {:connected, left, new_pending}, data}
end
end
def handle_event(_, _, _, _) do
:keep_state_and_data
end
def terminate(_, _, %Data{socket: socket}) do
:gen_tcp.close(socket)
end
defp transport_send(%Data{socket: socket}, payload) do
packet = :jiffy.encode(payload)
:gen_tcp.send(socket, packet)
end
defp decode_all(packet, events \\ []) do
try do
case :jiffy.decode(packet, [:return_maps, :return_trailer]) do
{:has_trailer, payload, trailer} ->
payload_event = {:next_event, :internal, payload}
decode_all(trailer, [payload_event | events])
payload ->
payload_event = {:next_event, :internal, payload}
Enum.reverse([payload_event | events])
end
catch
_, _ -> :bad_json_received
end
end
defp build_pendings(my_id, range) do
Enum.map(range, payload_builder(my_id))
end
defp build_answer(my_id, %{"from" => from_id, "msg_id" => msg_id}) do
%{
"from" => my_id,
"to" => from_id,
"msg" => generate_msg(),
"msg_id" => UUID.uuid4(),
"resp_to" => msg_id
}
end
defp payload_builder(my_id) do
fn i ->
id = Integer.to_string(i)
%{"from" => my_id, "to" => id, "msg" => generate_msg(), "msg_id" => UUID.uuid4()}
end
end
## Approximately 20..30 messages
defp num_of_messages() do
:rand.uniform(10) + 20
end
## Approximately ~1s
defp time_to_respond() do
:rand.uniform(200) + 900
end
## Approximately ~3s
defp time_to_die_if_inactive() do
:rand.uniform(200) + 2900
end
## generate a message between 16 and 24 characters long
defp generate_msg() do
(10 + :rand.uniform(6))
|> :crypto.strong_rand_bytes()
|> Base.encode64(case: :lower)
end
end
Initialisation
We first implement what is going to be the initialisation module.
defmodule LoadTest.Init do
def init do
{:ok, build_state()}
end
defp build_state() do
host = :amoc_config.get(:host)
port = :amoc_config.get(:port)
%{host: host, port: port}
end
end
User runs
And here, we define a function that each user will execute:
defmodule LoadTest.Start do
def start(my_id, %{host: host, port: port}) do
{:ok, socket} = :gen_tcp.connect(host, port, [:binary, active: true])
data = %LoadTest.User.Data{
id: Integer.to_string(my_id),
neighbours: build_neighbours(my_id),
socket: socket
}
try do
schedule = [{:next_event, :internal, :do_connect}]
:gen_statem.enter_loop(LoadTest.User, [], :connecting, data, schedule)
catch
:exit, :normal -> :ok
end
end
defp build_neighbours(my_id) do
n = :amoc_config.get(:number_of_neighbours)
a = my_id - n
b = my_id + n
a..b
|> Enum.to_list()
|> List.delete(my_id)
end
end
The load testing module
Alas, this module does implement the :amoc_scenario
behaviour, and its two main callbacks simply call the previously defined two modules.
defmodule LoadTest do
@behaviour :amoc_scenario
Module.register_attribute(__MODULE__, :required_variable, persist: true)
@required_variable [
%{:name => :host, :default_value => ~c"localhost", :description => ~c"host to connect to"},
%{:name => :port, :default_value => 8888, :description => ~c"port to connect to"},
%{
:name => :number_of_neighbours,
:default_value => 10,
:description => ~c"number of neighbours a user will write to"
}
]
def init do
LoadTest.Init.init()
end
def start(user_id, opts) do
LoadTest.Start.start(user_id, opts)
end
end
Running the scenario!
Now we're ready to run this scenario, we will just ensure amoc
has been just cleanly (re-)started, as this also ensures the created module has been loaded, and we're good to go!
Application.stop(:amoc)
Application.ensure_all_started(:amoc)
:amoc.do(LoadTest, 6,
host: ~c"localhost",
port: 8888,
number_of_neighbours: 2
)
Don't forget to check the status of the run every once in a while:
:amoc_controller.get_status()
:amoc.stop()
The Instrumentation
Collect metrics
We have already seen that we can run a scenario, now, let's see about collecting some basic metrics thanks to amoc
's telemetry instrumentation.
We will collect first timestamps of the scenario start, so that timestamps of user starts can be aggregated to it, and then we also define collectors for users. We store everything on ets
tables, for simplicity, to analyse later.
defmodule AmocHandlers.ScenarioStats do
@scenario_stats :amoc_html_reporter_scenario_stats
def attach_handlers do
@scenario_stats =
:ets.new(@scenario_stats, [:named_table, :ordered_set, :public, write_concurrency: true])
:ok =
:telemetry.attach_many(
"amoc_html_reporter-scenario-init",
[
[:amoc, :scenario, :init, :start],
[:amoc, :scenario, :init, :stop],
[:amoc, :scenario, :init, :exception]
],
&__MODULE__.handle_event/4,
%{}
)
end
def handle_event(
[:amoc, :scenario, :init, :start],
%{monotonic_time: monotonicTime},
%{scenario: scenario},
_config
) do
milliseconds = System.convert_time_unit(monotonicTime, :native, :millisecond)
record = %{
scenario: :erlang.atom_to_binary(scenario),
status: "started",
reason: nil,
started_at: milliseconds,
duration: nil
}
:ets.insert(@scenario_stats, {scenario, record})
end
def handle_event(
[:amoc, :scenario, :init, :stop],
%{duration: duration},
%{scenario: scenario},
_config
) do
[{_, record}] = :ets.lookup(@scenario_stats, scenario)
milliseconds = System.convert_time_unit(duration, :native, :millisecond)
newrecord = %{record | status: "finished", reason: :normal, duration: milliseconds}
:ets.insert(@scenario_stats, {scenario, newrecord})
end
def handle_event(
[:amoc, :scenario, :init, :exception],
%{duration: duration},
%{scenario: scenario, kind: class, reason: reason},
_config
) do
[{_, record}] = :ets.lookup(@scenario_stats, scenario)
milliseconds = System.convert_time_unit(duration, :native, :millisecond)
newrecord = %{record | status: "exited", reason: {class, reason}, duration: milliseconds}
:ets.insert(@scenario_stats, {scenario, newrecord})
end
end
defmodule AmocHandlers.UserStats do
@scenario_stats :amoc_html_reporter_scenario_stats
@users_plot :amoc_html_reporter_users_plot
def attach_handlers do
@users_plot =
:ets.new(@users_plot, [
:named_table,
:ordered_set,
:public,
read_concurrency: true,
write_concurrency: true
])
:ok =
:telemetry.attach_many(
"amoc_html_reporter-scenario-user",
[
[:amoc, :scenario, :start, :start],
[:amoc, :scenario, :start, :stop],
[:amoc, :scenario, :start, :exception]
],
&AmocHandlers.UserStats.handle_event/4,
%{}
)
end
def handle_event(
[:amoc, :scenario, :start, :start],
%{monotonic_time: monotonicTime},
%{scenario: scenario, user_id: userId},
_config
) do
milliseconds = System.convert_time_unit(monotonicTime, :native, :millisecond)
record = %{
id: userId,
scenario: :erlang.atom_to_binary(scenario),
status: "started",
reason: nil,
started_at: milliseconds,
duration: nil
}
:ets.insert(@users_plot, {userId, record})
end
def handle_event(
[:amoc, :scenario, :start, :stop],
%{duration: duration},
%{scenario: scenario, user_id: user_id},
_config
) do
status = "finished"
event_reason = :normal
handle_end(user_id, scenario, duration, status, event_reason)
end
def handle_event(
[:amoc, :scenario, :start, :exception],
%{duration: duration},
%{scenario: scenario, user_id: user_id, kind: class, reason: reason, stacktrace: s},
_config
) do
status = "exited"
event_reason = {class, reason, s}
handle_end(user_id, scenario, duration, status, event_reason)
end
defp handle_end(user_id, scenario, duration, status, reason) do
duration_ms = System.convert_time_unit(duration, :native, :millisecond)
[{_, scenario_record}] = :ets.lookup(@scenario_stats, scenario)
scenario_started_at = scenario_record.started_at
[{_, user_record}] = :ets.lookup(@users_plot, user_id)
started_at = user_record.started_at
new_record = %{
user_record
| status: status,
reason: reason,
started_at: scenario_started_at - started_at,
duration: duration_ms
}
:ets.insert(@users_plot, {user_id, new_record})
end
end
defmodule AmocHandlers do
use GenServer
# Initialization
def start_link() do
case :erlang.whereis(__MODULE__) do
:undefined -> :ok
_pid -> GenServer.stop(__MODULE__)
end
GenServer.start_link(__MODULE__, :no_opts, name: __MODULE__)
end
def init(:no_opts) do
attach_handlers()
{:ok, :no_state}
end
# Callbacks
def handle_call(_msg, _from, state) do
{:reply, :ok, state}
end
def handle_cast(_msg, state) do
{:noreply, state}
end
def handle_info(_msg, state) do
{:noreply, state}
end
defp attach_handlers() do
AmocHandlers.UserStats.attach_handlers()
AmocHandlers.ScenarioStats.attach_handlers()
end
end
The Results
Don't forget to restart amoc
Application.stop(:amoc)
Application.stop(:telemetry)
{ok, apps} = Application.ensure_all_started([:telemetry, :amoc])
{ok, server_handler} = AmocHandlers.start_link()
{apps, server_handler}
:amoc.do(LoadTest, 5000,
host: ~c"localhost",
port: 8888,
number_of_neighbours: 10
)
And we can some time check the scenario status
:amoc_controller.get_status()
And then finally stop the scenario
# :amoc.stop()
mapper = fn {_id, value} -> Map.delete(value, :reason) end
tab =
:ets.tab2list(:amoc_html_reporter_users_plot)
|> Enum.map(mapper)
|> Explorer.DataFrame.new()
require Explorer.DataFrame
done_so_far =
tab
|> Explorer.DataFrame.lazy()
|> Explorer.DataFrame.filter(duration >= 0)
|> Explorer.DataFrame.collect()
require Explorer.DataFrame
tab
|> Explorer.DataFrame.lazy()
|> Explorer.DataFrame.group_by("status")
|> Explorer.DataFrame.summarise(status_count: count(status))
VegaLite.new(width: 800)
|> VegaLite.data_from_values(done_so_far, only: ["started_at", "duration"])
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "started_at", type: :quantitative, scale: [type: :linear])
|> VegaLite.encode_field(:y, "duration", type: :quantitative, scale: [type: :linear])
Interpretation
We can see it this simple result, that on average, users took around 25-30 seconds to carry on their tasks, with no spikes delayed long above, meaning, the users didn't have to wait long for the server to accept their messages. Note however, that we don't know if the messages were delivered, nor how long they took, because the server had hidden this information from us. But we have written a server, and a scenario, and we have run the scenario, purely in Elixir and Livebook.
May this guide have served you well!