View Source Amoc

Mix.install([
  {:ranch, "~> 2.1"},
  {:jiffy, "~> 1.1"},
  {:gen_state_machine, "~> 2.0"},
  {:uuid, "~> 1.1"},
  {:amoc, "~> 3.0"},
  {:kino, "~> 0.11.3"},
  {:vega_lite, "~> 0.1.6"},
  {:kino_vega_lite, "~> 0.1.10"},
  {:kino_explorer, "~> 0.1.11"}
])

Getting Started

In this book, we're going to experiment with some barebones load testing cases. We will define a server, a scenario, some instrumentation, and we will load-test the server and get some results.

Let's get started!

The Server

For this demo, we will need something to load-test to begin with. So let's create a very basic super private messaging server. The server is a barebones message-router and nothing more, and it works with JSON payloads, we will define the protocol as follows:

  • On user connect, the user provides %{<<"connect">> => user_id} where user_id is any binary, for example a UUIDv4 or something even more impossible to guess:
    • The server checks only that no other user is connected with this user_id:
      • If the user_id is free, accept the connection;
      • If the user_id is taken, then it closes the connection;
  • On a connected session, the server takes any JSON payload, provided it has two mandatory fields, from and to:
    • The server checks only that the specified from matches the one the connection was establised with:
      • If it matches, then it routes the entire payload to to, transparently;
      • If it doesn't, then it does not route the message knowing it was a pishing attemp;
    • Note that in any case the server tell anything to the client about whether the peer was connected or the pishing attempt, to avoid leaking any knowledge about the intention of the payload. It is the responsibility of the peer, if wants to, to react to the message.

How does the routing work? Simply, it delivers the payload to the recipient, if a recipient is connected, otherwise it does nothing. As the recipient's user_id is arbitrarily set by the recipient, the sender can only write to the recipient if it already knows his user_id.

See the code below!

The Session Manager

defmodule MessagingServer.SM do
  @moduledoc """
  This is a simple wrapper around an ETS table that stores {user_id, pid} pairs,
  in order to be able to deliver messages from one user_id to another.
  """

  use GenServer
  @table __MODULE__
  @type user_id() :: term()

  @doc "Puts a _new_ user in the session table"
  def put_new(user_id, pid) do
    :ets.insert_new(@table, {user_id, pid})
  end

  @doc "Gets the pid of a user by its ID, if the ID is registered"
  def get(user_id) do
    case :ets.lookup_element(@table, user_id, 2, {:error, :not_found}) do
      {:error, :not_found} -> {:error, :not_found}
      pid -> pid
    end
  end

  @doc "Removes a user from the session table"
  def delete(user_id) do
    :ets.delete(@table, user_id)
  end

  def start_link() do
    GenServer.start_link(__MODULE__, :no_args, [])
  end

  def init(_) do
    :ets.new(@table, [
      :named_table,
      :public,
      :set,
      write_concurrency: true,
      read_concurrency: true
    ])

    {:ok, :no_state}
  end
end

The C2S code

This is what implements the protocol, as a very simple state machine

defmodule MessagingServer.C2S do
  @moduledoc """
  Implements the server-side protocol
  """

  use GenStateMachine, callback_mode: :handle_event_function

  require Record
  require Logger

  Record.defrecord(:data, ref: nil, transport: nil, socket: nil)

  @type state() :: :connecting | {:connected, term()}

  def start_link(ref, transport, opts) do
    GenStateMachine.start_link(__MODULE__, {ref, transport, opts}, [])
  end

  def init({ref, transport, _}) do
    do_connect = {:next_event, :internal, {:do_connect, ref}}
    {:ok, :connecting, data(ref: ref, transport: transport), do_connect}
  end

  ## get ranch handshake and socket
  def handle_event(:internal, {:do_connect, ref}, :connecting, data) do
    {:ok, socket} = :ranch.handshake(ref)
    full_data = data(data, socket: socket)
    active_once(full_data)
    {:next_state, :connecting, full_data}
  end

  ## From device to server
  ## Connecting a session
  def handle_event(:internal, %{"connect" => user_id}, :connecting, data) do
    case MessagingServer.SM.put_new(user_id, self()) do
      true ->
        payload = %{"connected" => user_id}
        transport_send(data, payload)
        {:next_state, {:connected, user_id}, data}

      false ->
        payload = %{"rejected" => "duplicate"}
        transport_send(data, payload)
        {:stop, :rejected_connection_because_of_duplicate}
    end
  end

  ## Sending messages
  def handle_event(:internal, payload, {:connected, user_id}, _) do
    case payload do
      ## user_id =:= from -> no pishing
      %{"from" => ^user_id, "to" => to} ->
        case MessagingServer.SM.get(to) do
          {:error, :not_found} ->
            :keep_state_and_data

          pid ->
            send(pid, {:route, payload})
            :keep_state_and_data
        end

      _ ->
        :keep_state_and_data
    end
  end

  ## From peer to peer
  ## Receiving messages
  def handle_event(:info, {:route, payload}, {:connected, user_id}, data) do
    case payload do
      ## user_id =:= to
      %{"to" => ^user_id} ->
        transport_send(data, payload)
        :keep_state_and_data

      ## message wasnt to me, ignore without revealing who wrote to me
      _ ->
        :keep_state_and_data
    end
  end

  ## TCP related handlers
  def handle_event(:info, {:tcp_closed, socket}, _, data(socket: socket)) do
    {:stop, :normal}
  end

  def handle_event(:info, {:tcp_error, socket, reason}, _, data(socket: socket)) do
    {:stop, reason}
  end

  def handle_event(:info, {:tcp, socket, packet}, _, data = data(socket: socket)) do
    case decode_all(packet) do
      :bad_json_received ->
        {:stop, :bad_json_received}

      events ->
        active_once(data)
        {:keep_state_and_data, events}
    end
  end

  ## Match-all ignore-rest
  def handle_event(event_type, event_content, _state, _data) do
    Process.sleep(500)
    Logger.alert("we got something")
    Logger.alert(%{type: event_type, content: event_content, where: 105})
    :keep_state_and_data
  end

  def terminate(reason, state, d = data(socket: socket, transport: transport))
      when socket != nil and transport != nil do
    try do
      apply(transport, :close, [socket])
    catch
      _ -> :ok
    end

    terminate(reason, state, data(d, socket: nil, transport: nil))
  end

  def terminate(reason, {:connected, user_id}, data) do
    MessagingServer.SM.delete(user_id)
    terminate(reason, :connecting, data)
  end

  def terminate(_, _, _) do
    :ok
  end

  defp active_once(data(transport: transport, socket: socket)) do
    :ok = transport.setopts(socket, active: :once)
  end

  defp transport_send(data(transport: transport, socket: socket), payload) do
    packet = :jiffy.encode(payload)
    transport.send(socket, packet)
  end

  defp decode_all(packet, events \\ []) do
    try do
      case :jiffy.decode(packet, [:return_maps, :return_trailer]) do
        {:has_trailer, payload, trailer} ->
          payload_event = {:next_event, :internal, payload}
          decode_all(trailer, [payload_event | events])

        payload ->
          payload_event = {:next_event, :internal, payload}
          Enum.reverse([payload_event | events])
      end
    catch
      _, _ -> :bad_json_received
    end
  end
end

The Supervisor and Application modules

defmodule MessagingServer.Sup do
  use Supervisor

  def start_link(_) do
    Supervisor.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init([]) do
    ranch =
      :ranch.child_spec(
        MessagingServer,
        :ranch_tcp,
        %{socket_opts: [port: 8888]},
        MessagingServer.C2S,
        []
      )

    sm = %{id: MessagingServer.SM, start: {MessagingServer.SM, :start_link, []}}
    supFlags = %{strategy: :one_for_all, intensity: 0, period: 1}
    childSpecs = [sm, ranch]
    {:ok, {supFlags, childSpecs}}
  end
end
defmodule MessagingServer do
  use Application

  def start(_type, _args) do
    children = [MessagingServer.Sup]
    Supervisor.start_link(children, strategy: :one_for_one)
  end
end
MessagingServer.start(:normal, [])

The Scenario

Developing a scenario

TL;DR: https://hexdocs.pm/amoc/scenario.html

Now, say you've read the official guide, now you know we need two special callbacks, init/0 and start/1,2. Let's separate them into different modules, for readability.

The user process

We first define a simple struct that will hold the permanent data for the connection, like socket and config state. Then, we define a state machine that knows how to send messages. After a connection is established:

  • Every ~1 second it will send a message of 20-30 characters to some randomly selected neighbour
  • If it receives a message, it adds this message to the list of pending messages to send and continues going through its ~1 second loop
  • The user will have capped the maximum number of messages that it will send to ~20..30
  • When it can't send more messages nor it has received any for long enough, ~3 seconds, it will die
defmodule LoadTest.User.Data do
  defstruct([:id, :neighbours, :socket])
end
defmodule LoadTest.User do
  alias LoadTest.User.Data, as: Data
  use GenStateMachine, callback_mode: :handle_event_function

  defstruct([:id, :neighbours, :socket])

  def init(_) do
    raise "This should be unreachable"
  end

  def handle_event(:info, {:tcp_error, socket, reason}, _, %Data{socket: socket}) do
    {:stop, reason}
  end

  def handle_event(:info, {:tcp, socket, packet}, _, %Data{socket: socket}) do
    case decode_all(packet) do
      :bad_json_received ->
        {:stop, :bad_json_received}

      events ->
        {:keep_state_and_data, events}
    end
  end

  def handle_event(:info, {:EXIT, _pid, reason}, _, _) do
    {:stop, reason}
  end

  ## Connecting a session
  def handle_event(:internal, :do_connect, :connecting, data = %Data{id: my_id}) do
    payload = %{"connect" => my_id}
    transport_send(data, payload)
    {:next_state, :waiting_for_connection_ack, data}
  end

  def handle_event(
        :internal,
        %{"connected" => my_id},
        :waiting_for_connection_ack,
        %Data{id: my_id, neighbours: neighbours} = data
      ) do
    left = num_of_messages()
    pending = build_pendings(my_id, neighbours)
    schedule = [{:next_event, {:timeout, :time_to_send_one}, :time_to_send_one}]
    {:next_state, {:connected, left, pending}, data, schedule}
  end

  ## This is it, it really can't send any more messages, nor has it received any, DIE!
  def handle_event(:timeout, :inactive_long_enough, {:connected, 0, _}, _) do
    :stop
  end

  ## It is still receiving messages, act like you stay a bit longer to read them but then die
  def handle_event(_, _, {:connected, 0, _}, _) do
    schedule = [{:timeout, time_to_die_if_inactive(), :inactive_long_enough}]
    {:keep_state_and_data, schedule}
  end

  ## Nothing has really happened in a while, disconnect the user
  def handle_event(:timeout, :inactive_long_enough, _, _) do
    :stop
  end

  ## Time to send a message
  def handle_event(
        {:timeout, :time_to_send_one},
        :time_to_send_one,
        {:connected, left, pending},
        data
      ) do
    send_now = Enum.random(pending)
    new_pending = List.delete(pending, send_now)
    transport_send(data, send_now)

    case {left, Enum.count(new_pending)} do
      {1, _} ->
        schedule = [{:timeout, time_to_die_if_inactive(), :inactive_long_enough}]
        {:next_state, {:connected, 0, []}, data, schedule}

      {_, 0} ->
        schedule = [{:timeout, time_to_die_if_inactive(), :inactive_long_enough}]
        {:next_state, {:connected, left - 1, new_pending}, data, schedule}

      _ ->
        schedule = [{{:timeout, :time_to_send_one}, time_to_respond(), :time_to_send_one}]
        {:next_state, {:connected, left - 1, new_pending}, data, schedule}
    end
  end

  ## I received a message, note that we match the Data.id with the "to" field
  def handle_event(
        :internal,
        %{"to" => my_id} = payload,
        {:connected, left, pending},
        %Data{id: my_id} = data
      ) do
    new_pending = [build_answer(my_id, payload) | pending]

    case Enum.count(pending) do
      ## then we know we didn't have anything schedule, do so now
      0 ->
        schedule = [{{:timeout, :time_to_send_one}, time_to_respond(), :time_to_send_one}]
        {:next_state, {:connected, left, new_pending}, data, schedule}

      ## So something else is coming already, this only append to pending
      _ ->
        {:next_state, {:connected, left, new_pending}, data}
    end
  end

  def handle_event(_, _, _, _) do
    :keep_state_and_data
  end

  def terminate(_, _, %Data{socket: socket}) do
    :gen_tcp.close(socket)
  end

  defp transport_send(%Data{socket: socket}, payload) do
    packet = :jiffy.encode(payload)
    :gen_tcp.send(socket, packet)
  end

  defp decode_all(packet, events \\ []) do
    try do
      case :jiffy.decode(packet, [:return_maps, :return_trailer]) do
        {:has_trailer, payload, trailer} ->
          payload_event = {:next_event, :internal, payload}
          decode_all(trailer, [payload_event | events])

        payload ->
          payload_event = {:next_event, :internal, payload}
          Enum.reverse([payload_event | events])
      end
    catch
      _, _ -> :bad_json_received
    end
  end

  defp build_pendings(my_id, range) do
    Enum.map(range, payload_builder(my_id))
  end

  defp build_answer(my_id, %{"from" => from_id, "msg_id" => msg_id}) do
    %{
      "from" => my_id,
      "to" => from_id,
      "msg" => generate_msg(),
      "msg_id" => UUID.uuid4(),
      "resp_to" => msg_id
    }
  end

  defp payload_builder(my_id) do
    fn i ->
      id = Integer.to_string(i)
      %{"from" => my_id, "to" => id, "msg" => generate_msg(), "msg_id" => UUID.uuid4()}
    end
  end

  ## Approximately 20..30 messages
  defp num_of_messages() do
    :rand.uniform(10) + 20
  end

  ## Approximately ~1s
  defp time_to_respond() do
    :rand.uniform(200) + 900
  end

  ## Approximately ~3s
  defp time_to_die_if_inactive() do
    :rand.uniform(200) + 2900
  end

  ## generate a message between 16 and 24 characters long
  defp generate_msg() do
    (10 + :rand.uniform(6))
    |> :crypto.strong_rand_bytes()
    |> Base.encode64(case: :lower)
  end
end

Initialisation

We first implement what is going to be the initialisation module.

defmodule LoadTest.Init do
  def init do
    {:ok, build_state()}
  end

  defp build_state() do
    host = :amoc_config.get(:host)
    port = :amoc_config.get(:port)
    %{host: host, port: port}
  end
end

User runs

And here, we define a function that each user will execute:

defmodule LoadTest.Start do
  def start(my_id, %{host: host, port: port}) do
    {:ok, socket} = :gen_tcp.connect(host, port, [:binary, active: true])

    data = %LoadTest.User.Data{
      id: Integer.to_string(my_id),
      neighbours: build_neighbours(my_id),
      socket: socket
    }

    try do
      schedule = [{:next_event, :internal, :do_connect}]
      :gen_statem.enter_loop(LoadTest.User, [], :connecting, data, schedule)
    catch
      :exit, :normal -> :ok
    end
  end

  defp build_neighbours(my_id) do
    n = :amoc_config.get(:number_of_neighbours)
    a = my_id - n
    b = my_id + n

    a..b
    |> Enum.to_list()
    |> List.delete(my_id)
  end
end

The load testing module

Alas, this module does implement the :amoc_scenario behaviour, and its two main callbacks simply call the previously defined two modules.

defmodule LoadTest do
  @behaviour :amoc_scenario

  Module.register_attribute(__MODULE__, :required_variable, persist: true)

  @required_variable [
    %{:name => :host, :default_value => ~c"localhost", :description => ~c"host to connect to"},
    %{:name => :port, :default_value => 8888, :description => ~c"port to connect to"},
    %{
      :name => :number_of_neighbours,
      :default_value => 10,
      :description => ~c"number of neighbours a user will write to"
    }
  ]

  def init do
    LoadTest.Init.init()
  end

  def start(user_id, opts) do
    LoadTest.Start.start(user_id, opts)
  end
end

Running the scenario!

Now we're ready to run this scenario, we will just ensure amoc has been just cleanly (re-)started, as this also ensures the created module has been loaded, and we're good to go!

Application.stop(:amoc)
Application.ensure_all_started(:amoc)

:amoc.do(LoadTest, 6,
  host: ~c"localhost",
  port: 8888,
  number_of_neighbours: 2
)

Don't forget to check the status of the run every once in a while:

:amoc_controller.get_status()
:amoc.stop()

The Instrumentation

Collect metrics

We have already seen that we can run a scenario, now, let's see about collecting some basic metrics thanks to amoc's telemetry instrumentation.

We will collect first timestamps of the scenario start, so that timestamps of user starts can be aggregated to it, and then we also define collectors for users. We store everything on ets tables, for simplicity, to analyse later.

defmodule AmocHandlers.ScenarioStats do
  @scenario_stats :amoc_html_reporter_scenario_stats

  def attach_handlers do
    @scenario_stats =
      :ets.new(@scenario_stats, [:named_table, :ordered_set, :public, write_concurrency: true])

    :ok =
      :telemetry.attach_many(
        "amoc_html_reporter-scenario-init",
        [
          [:amoc, :scenario, :init, :start],
          [:amoc, :scenario, :init, :stop],
          [:amoc, :scenario, :init, :exception]
        ],
        &__MODULE__.handle_event/4,
        %{}
      )
  end

  def handle_event(
        [:amoc, :scenario, :init, :start],
        %{monotonic_time: monotonicTime},
        %{scenario: scenario},
        _config
      ) do
    milliseconds = System.convert_time_unit(monotonicTime, :native, :millisecond)

    record = %{
      scenario: :erlang.atom_to_binary(scenario),
      status: "started",
      reason: nil,
      started_at: milliseconds,
      duration: nil
    }

    :ets.insert(@scenario_stats, {scenario, record})
  end

  def handle_event(
        [:amoc, :scenario, :init, :stop],
        %{duration: duration},
        %{scenario: scenario},
        _config
      ) do
    [{_, record}] = :ets.lookup(@scenario_stats, scenario)
    milliseconds = System.convert_time_unit(duration, :native, :millisecond)
    newrecord = %{record | status: "finished", reason: :normal, duration: milliseconds}
    :ets.insert(@scenario_stats, {scenario, newrecord})
  end

  def handle_event(
        [:amoc, :scenario, :init, :exception],
        %{duration: duration},
        %{scenario: scenario, kind: class, reason: reason},
        _config
      ) do
    [{_, record}] = :ets.lookup(@scenario_stats, scenario)
    milliseconds = System.convert_time_unit(duration, :native, :millisecond)
    newrecord = %{record | status: "exited", reason: {class, reason}, duration: milliseconds}
    :ets.insert(@scenario_stats, {scenario, newrecord})
  end
end
defmodule AmocHandlers.UserStats do
  @scenario_stats :amoc_html_reporter_scenario_stats
  @users_plot :amoc_html_reporter_users_plot

  def attach_handlers do
    @users_plot =
      :ets.new(@users_plot, [
        :named_table,
        :ordered_set,
        :public,
        read_concurrency: true,
        write_concurrency: true
      ])

    :ok =
      :telemetry.attach_many(
        "amoc_html_reporter-scenario-user",
        [
          [:amoc, :scenario, :start, :start],
          [:amoc, :scenario, :start, :stop],
          [:amoc, :scenario, :start, :exception]
        ],
        &AmocHandlers.UserStats.handle_event/4,
        %{}
      )
  end

  def handle_event(
        [:amoc, :scenario, :start, :start],
        %{monotonic_time: monotonicTime},
        %{scenario: scenario, user_id: userId},
        _config
      ) do
    milliseconds = System.convert_time_unit(monotonicTime, :native, :millisecond)

    record = %{
      id: userId,
      scenario: :erlang.atom_to_binary(scenario),
      status: "started",
      reason: nil,
      started_at: milliseconds,
      duration: nil
    }

    :ets.insert(@users_plot, {userId, record})
  end

  def handle_event(
        [:amoc, :scenario, :start, :stop],
        %{duration: duration},
        %{scenario: scenario, user_id: user_id},
        _config
      ) do
    status = "finished"
    event_reason = :normal
    handle_end(user_id, scenario, duration, status, event_reason)
  end

  def handle_event(
        [:amoc, :scenario, :start, :exception],
        %{duration: duration},
        %{scenario: scenario, user_id: user_id, kind: class, reason: reason, stacktrace: s},
        _config
      ) do
    status = "exited"
    event_reason = {class, reason, s}
    handle_end(user_id, scenario, duration, status, event_reason)
  end

  defp handle_end(user_id, scenario, duration, status, reason) do
    duration_ms = System.convert_time_unit(duration, :native, :millisecond)
    [{_, scenario_record}] = :ets.lookup(@scenario_stats, scenario)
    scenario_started_at = scenario_record.started_at
    [{_, user_record}] = :ets.lookup(@users_plot, user_id)
    started_at = user_record.started_at

    new_record = %{
      user_record
      | status: status,
        reason: reason,
        started_at: scenario_started_at - started_at,
        duration: duration_ms
    }

    :ets.insert(@users_plot, {user_id, new_record})
  end
end
defmodule AmocHandlers do
  use GenServer

  # Initialization
  def start_link() do
    case :erlang.whereis(__MODULE__) do
      :undefined -> :ok
      _pid -> GenServer.stop(__MODULE__)
    end

    GenServer.start_link(__MODULE__, :no_opts, name: __MODULE__)
  end

  def init(:no_opts) do
    attach_handlers()
    {:ok, :no_state}
  end

  # Callbacks
  def handle_call(_msg, _from, state) do
    {:reply, :ok, state}
  end

  def handle_cast(_msg, state) do
    {:noreply, state}
  end

  def handle_info(_msg, state) do
    {:noreply, state}
  end

  defp attach_handlers() do
    AmocHandlers.UserStats.attach_handlers()
    AmocHandlers.ScenarioStats.attach_handlers()
  end
end

The Results

Don't forget to restart amoc

Application.stop(:amoc)
Application.stop(:telemetry)
{ok, apps} = Application.ensure_all_started([:telemetry, :amoc])
{ok, server_handler} = AmocHandlers.start_link()
{apps, server_handler}
:amoc.do(LoadTest, 5000,
  host: ~c"localhost",
  port: 8888,
  number_of_neighbours: 10
)

And we can some time check the scenario status

:amoc_controller.get_status()

And then finally stop the scenario

# :amoc.stop()
mapper = fn {_id, value} -> Map.delete(value, :reason) end

tab =
  :ets.tab2list(:amoc_html_reporter_users_plot)
  |> Enum.map(mapper)
  |> Explorer.DataFrame.new()
require Explorer.DataFrame

done_so_far =
  tab
  |> Explorer.DataFrame.lazy()
  |> Explorer.DataFrame.filter(duration >= 0)
  |> Explorer.DataFrame.collect()
require Explorer.DataFrame

tab
|> Explorer.DataFrame.lazy()
|> Explorer.DataFrame.group_by("status")
|> Explorer.DataFrame.summarise(status_count: count(status))
VegaLite.new(width: 800)
|> VegaLite.data_from_values(done_so_far, only: ["started_at", "duration"])
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "started_at", type: :quantitative, scale: [type: :linear])
|> VegaLite.encode_field(:y, "duration", type: :quantitative, scale: [type: :linear])

Interpretation

We can see it this simple result, that on average, users took around 25-30 seconds to carry on their tasks, with no spikes delayed long above, meaning, the users didn't have to wait long for the server to accept their messages. Note however, that we don't know if the messages were delivered, nor how long they took, because the server had hidden this information from us. But we have written a server, and a scenario, and we have run the scenario, purely in Elixir and Livebook.

May this guide have served you well!