riverside v1.2.6 Riverside behaviour

Riverside - Plain WebSocket Server Framework for Elixir

Getting Started

Handler

At first, you need to prepare your own Handler module with use Riverside line.

in handle_message/3, process messages sent by client. This doesn't depend on some protocol like Socket.io. So do client-side, you don't need to prepared some libraries.

defmodule MySocketHandler do

  # set 'otp_app' param like Ecto.Repo
  use Riverside, otp_app: :my_app

  @impl Riverside
  def handle_message(msg, session, state) do

    # `msg` is a 'TEXT' or 'BINARY' frame sent by client,
    # process it as you like
    deliver_me(msg)

    {:ok, session, state}

  end

end

Application child_spec

And in your Application module, set child spec for your supervisor.

defmodule MyApp do

  use Application

  def start(_type, _args) do
    [
      # ...
       {Riverside, [handler: MySocketHandler]}
    ]
    |> Supervisor.start_link([
      strategy: :one_for_one,
      name:     MyApp.Spervisor
    ])
  end

end

Configuration

config :my_app, MySocketHandler,
  port: 3000,
  path: "/my_ws",
  max_connections: 10000, # don't accept connections if server already has this number of connections
  max_connection_age: :infinity, # force to disconnect a connection if the duration passed. if :infinity is set, do nothing.
  idle_timeout: 120_000, # disconnect if no event comes on a connection during this duration
  reuse_port: false, # TCP SO_REUSEPORT flag
  show_debug_logs: false,
  transmission_limit: [
    capacity: 50,  # if 50 frames are sent on a connection
    duration: 2000 # in 2 seconds, disconnect it.
  ]

I’ll show you detailed description below. But you will know most of them when you see them.

Run

Launch your application, then the WebSocket service is provided with an endpoint like the following.

ws://localhost:3000/my_ws

And at the same time, we can also access to

http://localhost:3000/health

If you send a HTTP GET request to this URL, it returns response with status code 200, and text content "OK". This is just for health check.

And

http://localhost:3000/metrics

This endpoint shows prometheus-formatted metrics.

These features are defined in a Plug Router named Riverside.Router, and this is configured as default router param for child spec. So, you can defined your own Plug Router if you set as below.

In your Application module

defmodule MyApp do

  use Application

  def start(_type, _args) do
    [
      # ...
       {Riverside, [
         handler: MySocketHandler,
         router: MyRouter, # Set your Plug Router here
        ]}
    ]
    |> Supervisor.start_link([
      strategy: :one_for_one,
      name:     MyApp.Spervisor
    ])
  end

end

Handler's Callbacks

You can also define callback functions other than handle_message/3.

For instance, there are functions named init, terminate, and handle_info. If you are accustomed to GenServer, you can easily imagine what they are, though their interface is little bit different.

defmodule MySocketHandler do

  use Riverside, otp_app: :my_app

  @impl Riverside
  def init(session, state) do
    # initialization
    {:ok, session, state}
  end

  @impl Riverside
  def handle_message(msg, session, state) do
    deliver_me(msg)
    {:ok, session, state}

  end

  @impl Riverside
  def handle_info(into, session, state) do
    # handle message sent to this process
    {:ok, session, state}
  end

  @impl Riverside
  def terminate(reason, session, state) do
  # cleanup
    :ok
  end

end

Authentication and Session

Here, I'll describe authenticate/1 callback function.

defmodule MySocketHandler do

  use Riverside, otp_app: :my_app

  @impl Riverside
  def authenticate(req) do
    {username, password} = req.basic
    case MyAuthenticator.authenticate(username, password) do

      {:ok, user_id} ->
        state = %{}
        {:ok, user_id, state}

      {:error, :invalid_password} ->
        error = auth_error_with_code(401)
        {:error, error}
    end
  end

  @impl Riverside
  def init(session, state) do
    {:ok, session, state}
  end

  @impl Riverside
  def handle_message(msg, session, state) do
    deliver_me(msg)
    {:ok, session, state}

  end

  @impl Riverside
  def handle_info(into, session, state) do
    {:ok, session, state}
  end

  @impl Riverside
  def terminate(reason, session, state) do
    :ok
  end

end

The argument of authenticate/1 is a struct of Riverside.AuthRequest.t. And it has Map members

  • queries: Map includes HTTP request's query params
  • headers: Map includes HTTP headers

# When client access with a URL such like ws://localhost:3000/my_ws?token=FOOBAR,
# And you want to authenticate the `token` parameter ("FOOBAR", this time)

@impl Riverside
def authenticate(req) do
  # You can pick the parameter like as below
  token = req.queries["token"]
  # ...
end
# Or else you want to authenticate with `Authorization` HTTP header.

@impl Riverside
def authenticate(req) do
  # You can pick the header value like as below
  auth_header = req.headers["authorization"]
  # ...
end

The fact is that, you don't need to parse Authorization header by yourself, if you want to do Basic or Bearer authentication.


# Pick up `username` and `password` from `Basic` Authorization header.
# If it doesn't exist, `username` and `password` become empty strings.

@impl Riverside
def authenticate(req) do
  {username, password} = req.basic
  # ...
end
# Pick up token value from `Bearer` Authorization header
# If it doesn't exist, `token` become empty string.

@impl Riverside
def authenticate(req) do
  token = req.bearer_token
  # ...
end

Authentication failure

If authentication failure, you need to return {:error, Riverside.AuthError.t}. You can build Riverside.AuthError struct with auth_error_with_code/1. Pass proper HTTP status code.

@impl Riverside
def authenticate(req) do

  token = req.bearer_token

  case MyAuth.authenticate(token) do

    {:error, :invalid_token} ->
       error = auth_error_with_code(401)
       {:error, error}

    # _ -> ...

  end

end

You can use put_auth_error_header/2 to put response header

error = auth_erro_with_code(400)
      |> puth_auth_error_header("WWW-Authenticate", "Basic realm=\"example.org\"")

And two more shortcuts, put_auth_error_basic_header and put_auth_error_bearer_header.

error = auth_erro_with_code(401)
      |> puth_auth_error_basic_header("example.org")

# This puts `WWW-Authenticate: Basic realm="example.org"`
error = auth_erro_with_code(401)
      |>  puth_auth_error_bearer_header("example.org")

# This puts `WWW-Authenticate: Bearer realm="example.org"`
error = auth_erro_with_code(400)
      |>  puth_auth_error_bearer_header("example.org", "invalid_token")

# This puts `WWW-Authenticate: Bearer realm="example.org", error="invalid_token"`

Successful authentication

@impl Riverside
def authenticate(req) do

  token = req.bearer_token

  case MyAuth.authenticate(token) do

    {:ok, user_id} ->
      session_id = create_random_string()
      state = %{}
      {:ok, user_id, session_id, state}

    # _ -> ...

  end
end

If authentication results in success, return {:ok, user_id, session_id, state}. You can put any data into state, same as you do in init in GenServer. session_id should be random string. You also can return {:ok, user_id, state}, and Then session_id will be generated automatically.

And init/3 will be called after successful auth response.

session

Now I can describe about the session parameter included for each callback functions.

This is a Riverside.Session.t struct, and it includes some parameters like user_id and session_id.

When you omit to define authenticate/1, both user_id and session_id will be set random value.

@impl Riverside
def handle_message(msg, session, state) do
  # session.user_id
  # session.session_id
end

Message and Delivery

Message Format

If a client sends a simple TEXT frame with JSON format like the following

{
  "to": 1111,
  "body": "Hello"
}

You can handle this JSON message as a Map.

@impl Riverside
def handle_message(incoming_message, session, state) do

  dest_user_id = incoming_message["to"]
  body         = incoming_message["body"]

  outgoing_message = %{
    "from" => "#{session.user_id}",
    "body" => body,
  }

  deliver_user(dest_user_id, outgoing_message)

  {:ok, session, state}
end

Then the user who is set as destination(user_id == 1111, in this example) receives TEXT frame

{
  "from": 2222,
  "body": "Hello"
}

This is because Riverside.Codec.JSON is set for codec config as default.

config :my_app, MySocketHandler,
  codec: Riverside.Codec.JSON

This codec decodes incoming message, and encodes outgoing message.

If you want to accept TEXT frames but don't want encode/decode them. Should set Riverside.Codec.RawText

config :my_app, MySocketHandler,
  codec: Riverside.Codec.RawText

If you want to accept BINARY frames but don't want encode/decode them. Should set Riverside.Codec.RawBinary

config :my_app, MySocketHandler,
  codec: Riverside.Codec.RawBinary

Custom Codec

The fact is that, JSON codec module is written with small amount of code. Take a look at the inside.

defmodule Riverside.Codec.JSON do

  @behaviour Riverside.Codec

  @impl Riverside.Codec
  def frame_type do
    :text
  end

  @impl Riverside.Codec
  def encode(msg) do
    case Poison.encode(msg) do

      {:ok, value} ->
        {:ok, value}

      {:error, _exception} ->
        {:error, :invalid_message}

    end
  end

  @impl Riverside.Codec
  def decode(data) do
      case Poison.decode(data) do

        {:ok, value} ->
          {:ok, value}

        {:error, _exception} ->
          {:error, :invalid_message}

      end
  end

end

No explanation needed to write your own codec. It's too simple.

Delivery

There is a module named Riverside.LocalDelivery. With its deliver/2 function, you can deliver messages to sessions connected to the server.

def handle_message(msg, session, state) do

  dest_user_id = msg["to"]
  body = msg["body"]

  outgoing = %{
    from: session.user_id,
    body: body,
  }

 Riverside.LocalDelivery.deliver(
    {:user, dest_user_id},
    {:text, Poison.encode!(outgoing)}
  )

  {:ok, session, state}
end

First argument is a tuple which represents a destination, and second is a tuple which represents a frame.

frame should be {:text, body} or {:binary, body}. choose proper one.

OK, let's describe about 3 kinds of destination.

USER DESTINATION

{:user, user_id}

Send message to all the connections for this user.

Recent trend is multi device support. One single user may have a multi connections at the same time.

SESSION DESTINATION

{:session, user_id, session_id}

Send message to a specific connection for this user.

Sometime, this may be a very important feature. For instance, WebRTC-signaling, end-to-end encryption.

CHANNEL DESTINATION

{:channel, channel_id}

Send message to all the members who is belonging to this channel.

How to join or leave channels? See the example below.

def init(session, state) do
   Riverside.LocalDelivery.join_channel("my_channel")
  {:ok, session, state}
end

def handle_message(msg, session, state) do
  dest_channel_id = msg["to"]
  body = msg["body"]

  outgoing = %{
    from: session.user_id,
    body: body,
  }

 Riverside.LocalDelivery.deliver(
    {:channel, dest_channel_id},
    {:text, Poison.encode!(outgoing)}
  )
  {:ok, session, state}
end

def terminate(session, state) do
  Riverside.LocalDelivery.leave_channel("my_channel")
  :ok
end

Shortcuts for delivery

If you want to deliver messages from within your handler, You don't need to use Riverside.LocalDelivery directly.

Here are handy functions.

Let's replace LocalDelivery module to handy version.

def init(session, state) do
   join_channel("my_channel")
  {:ok, session, state}
end

def handle_message(msg, session, state) do
  dest_channel_id = msg["to"]
  body = msg["body"]

  outgoing = %{
    from: session.user_id,
    body: body,
  }
  # same as LocalDelivery.deliver
  # deliver({:channel, dest_channel_id}, {:text, Poison.encode!(outgoing)})

  # handy version, `codec` works on this way, so you don't need to encode by yourself.
 deliver_channel(dest_channel_id, outgoing)

  # If you want to send message to `user`
  # deliver_user(dest_user_id, outgoing)

  # If you want to send message to `session`
  # deliver_session(dest_user_id, dest_user_session_id, outgoing)

  {:ok, session, state}
end

def terminate(session, state) do
  leave_channel("my_channel")
  :ok
end

Echo Back

To deliver message to sender's connection, you can write like following.

deliver_me(msg)

This is same as

deliver_session(session.user_id, session.session_id, msg)

Close

Following like can deliver close message to specific connection.

Riverside.LocalDelivery.close(user_id, session_id)

or just close function.

close()

Example

def handle_message(msg, session, state) do

  if is_bad_message(msg) do
    close()
  else
    # ...
  end

  {:ok, session, state}
end

Scalable Service

LocalDelivery module and its handy shortcuts are just for local. This works only for communications in a single server.

If you need to support more scalable service, consider other solutions. For example, Redis-PubSub, RabbitMQ, or gnatsd.

Here is a example with https://github.com/lyokato/roulette (HashRing-ed gnatsd cluster client)

def init(session, state) do
  with {:ok, _} <- Roulette.sub("user:#{session.user_id}"),
       {:ok, _} <- Roulette.sub("session:#{session.user_id}/#{session.session_id}") do
    {:ok, session, state}
  else
    error ->
      Logger.wran "failed to setup subscription: #{inspect error}"
      {:error, :system_error}
  end
end

def handle_message(msg, session, state) do

  to   = msg["to"]
  body = msg["body"]

  outgoing = %{
    from: session.user_id,
    body: body,
  }

  case Roulette.pub("user:#{to}", Poison.encode!(outgoing)) do
    :ok    -> {:ok, session, state}
    :error -> {:error, :system_error}
  end

end

def handle_info(:pubsub_message, topic, msg, pid}, session, state) do
  deliver_me(:text, msg)
  {:ok, session, state}
end

def terminate(session, state) do
  :ok
end

Configurations

child_spec

{Riverside, [
  handler: MySocketHandler,
  router: MyRouter,
]}
keyworddefault valuedescription
handler--Required. Set your own handler module.
routerRiverside.RouterPlug.Router implementation module which provides endpoints other than ws(s)://

config file

config :my_app, MySocketHandler,
  port: 3000,
  path: "/my_ws",
  codec: Riverside.Codec.RawBinary,
  max_connections: 10000,
  max_connection_age: :infinity,
  show_debug_logs: false,
  idle_timeout: 120_000,
  reuse_port: false,
  transmission_limit: [
    duration: 2000,
    capacity: 50
  ]
keydefault valuedescription
port3000Port number this http server listens.
path/Path for WebSocket endpoint.
max_connections65536maximum number of connections this server can keep. you also pay attention to a configuration for a number of OS's file descriptors
max_connection_age:infinityForce to disconnect a connection if the duration(milliseconds) passed. Then terminate/3 will be called with :over_age as a reason. if :infinity is set, do nothing.
codecRiverside.Codec.JSONtext/binary frame codec.
show_debug_logsfalseIf this flag is true. detailed debug logs will be shown.
transmission_limitduration:2000, capacity:50if <:capacity> frames are sent on a connection in <:duration> milliseconds, disconnect it.Then terminate/3 will be called with :too_many_messages as a reason.
idle_timeout60000Disconnect if no event comes on a connection during this duration
reuse_portfalseTCP SO_REUSEPORT flag

Dynamic Port Number

You may set port number dinamically.

You can set port number like following.

config :my_app, MySocketHandler,
  port: {:system, "MY_PORT", 3000}

Then, port number is picked from runtime environment variable "MY_PORT". if it doesn't exist, 3000 will be used.

Link to this section Summary

Link to this section Types

Link to this type

terminate_reason()
terminate_reason() ::
  {:normal, :shutdown | :timeout}
  | {:remote, :closed}
  | {:remote, :cow_ws.close_code(), binary()}
  | {:error,
     :badencoding
     | :badframe
     | :closed
     | :too_many_massages
     | :over_age
     | atom()}

Link to this section Functions

Link to this function

child_spec(args)

Link to this section Callbacks

Link to this callback

__config__()
__config__() :: map()

Link to this callback

__handle_authentication__(req)
__handle_authentication__(req :: Riverside.AuthRequest.t()) ::
  {:ok, Riverside.Session.user_id(), any()}
  | {:ok, Riverside.Session.user_id(), Riverside.Session.session_id(), any()}
  | {:error, Riverside.AuthError.t()}

Link to this callback

__handle_data__(frame_type, message, session, state)
__handle_data__(
  frame_type :: Riverside.Codec.frame_type(),
  message :: binary(),
  session :: Riverside.Session.t(),
  state :: any()
) :: {:ok, Riverside.Session.t()} | {:error, :invalid_message | :unsupported}

Link to this callback

handle_info(info, session, state)
handle_info(info :: any(), session :: Riverside.Session.t(), state :: any()) ::
  {:ok, Riverside.Session.t(), any()} | {:stop, atom(), any()}

Link to this callback

handle_message(message, session, state)
handle_message(
  message :: any(),
  session :: Riverside.Session.t(),
  state :: any()
) :: {:ok, Riverside.Session.t(), any()} | {:stop, atom(), any()}

Link to this callback

init(session, state)
init(session :: Riverside.Session.t(), state :: any()) ::
  {:ok, Riverside.Session.t(), any()} | {:error, any()}

Link to this callback

terminate(reason, session, state)
terminate(
  reason :: terminate_reason(),
  session :: Riverside.Session.t(),
  state :: any()
) :: :ok