Phoenix.Channel behaviour (Phoenix v1.6.2) View Source
Defines a Phoenix Channel.
Channels provide a means for bidirectional communication from clients that
integrate with the Phoenix.PubSub
layer for soft-realtime functionality.
Topics & Callbacks
Every time you join a channel, you need to choose which particular topic you
want to listen to. The topic is just an identifier, but by convention it is
often made of two parts: "topic:subtopic"
. Using the "topic:subtopic"
approach pairs nicely with the Phoenix.Socket.channel/3
allowing you to
match on all topics starting with a given prefix by using a splat (the *
character) as the last character in the topic pattern:
channel "room:*", MyAppWeb.RoomChannel
Any topic coming into the router with the "room:"
prefix would dispatch
to MyAppWeb.RoomChannel
in the above example. Topics can also be pattern
matched in your channels' join/3
callback to pluck out the scoped pattern:
# handles the special `"lobby"` subtopic
def join("room:lobby", _payload, socket) do
{:ok, socket}
end
# handles any other subtopic as the room ID, for example `"room:12"`, `"room:34"`
def join("room:" <> room_id, _payload, socket) do
{:ok, socket}
end
Authorization
Clients must join a channel to send and receive PubSub events on that channel.
Your channels must implement a join/3
callback that authorizes the socket
for the given topic. For example, you could check if the user is allowed to
join that particular room.
To authorize a socket in join/3
, return {:ok, socket}
.
To refuse authorization in join/3
, return {:error, reply}
.
Incoming Events
After a client has successfully joined a channel, incoming events from the
client are routed through the channel's handle_in/3
callbacks. Within these
callbacks, you can perform any action. Typically you'll either forward a
message to all listeners with broadcast!/3
, or push a message directly down
the socket with push/3
. Incoming callbacks must return the socket
to
maintain ephemeral state.
Here's an example of receiving an incoming "new_msg"
event from one client,
and broadcasting the message to all topic subscribers for this socket.
def handle_in("new_msg", %{"uid" => uid, "body" => body}, socket) do
broadcast!(socket, "new_msg", %{uid: uid, body: body})
{:noreply, socket}
end
General message payloads are received as maps, and binary data payloads are
passed as a {:binary, data}
tuple:
def handle_in("file_chunk", {:binary, chunk}, socket) do
...
{:reply, :ok, socket}
end
You can also push a message directly down the socket, in the form of a map,
or a tagged {:binary, data}
tuple:
# client asks for their current rank, push sent directly as a new event.
def handle_in("current_rank", _, socket) do
push(socket, "current_rank", %{val: Game.get_rank(socket.assigns[:user])})
push(socket, "photo", {:binary, File.read!(socket.assigns.photo_path)})
{:noreply, socket}
end
Replies
In addition to pushing messages out when you receive a handle_in
event,
you can also reply directly to a client event for request/response style
messaging. This is useful when a client must know the result of an operation
or to simply ack messages.
For example, imagine creating a resource and replying with the created record:
def handle_in("create:post", attrs, socket) do
changeset = Post.changeset(%Post{}, attrs)
if changeset.valid? do
post = Repo.insert!(changeset)
response = MyAppWeb.PostView.render("show.json", %{post: post})
{:reply, {:ok, response}, socket}
else
response = MyAppWeb.ChangesetView.render("errors.json", %{changeset: changeset})
{:reply, {:error, response}, socket}
end
end
Alternatively, you may just want to ack the status of the operation:
def handle_in("create:post", attrs, socket) do
changeset = Post.changeset(%Post{}, attrs)
if changeset.valid? do
Repo.insert!(changeset)
{:reply, :ok, socket}
else
{:reply, :error, socket}
end
end
Like binary pushes, binary data is also supported with replies via a {:binary, data}
tuple:
{:reply, {:ok, {:binary, bin}}, socket}
Intercepting Outgoing Events
When an event is broadcasted with broadcast/3
, each channel subscriber can
choose to intercept the event and have their handle_out/3
callback triggered.
This allows the event's payload to be customized on a socket by socket basis
to append extra information, or conditionally filter the message from being
delivered. If the event is not intercepted with Phoenix.Channel.intercept/1
,
then the message is pushed directly to the client:
intercept ["new_msg", "user_joined"]
# for every socket subscribing to this topic, append an `is_editable`
# value for client metadata.
def handle_out("new_msg", msg, socket) do
push(socket, "new_msg", Map.merge(msg,
%{is_editable: User.can_edit_message?(socket.assigns[:user], msg)}
))
{:noreply, socket}
end
# do not send broadcasted `"user_joined"` events if this socket's user
# is ignoring the user who joined.
def handle_out("user_joined", msg, socket) do
unless User.ignoring?(socket.assigns[:user], msg.user_id) do
push(socket, "user_joined", msg)
end
{:noreply, socket}
end
Broadcasting to an external topic
In some cases, you will want to broadcast messages without the context of
a socket
. This could be for broadcasting from within your channel to an
external topic, or broadcasting from elsewhere in your application like a
controller or another process. Such can be done via your endpoint:
# within channel
def handle_in("new_msg", %{"uid" => uid, "body" => body}, socket) do
...
broadcast_from!(socket, "new_msg", %{uid: uid, body: body})
MyAppWeb.Endpoint.broadcast_from!(self(), "room:superadmin",
"new_msg", %{uid: uid, body: body})
{:noreply, socket}
end
# within controller
def create(conn, params) do
...
MyAppWeb.Endpoint.broadcast!("room:" <> rid, "new_msg", %{uid: uid, body: body})
MyAppWeb.Endpoint.broadcast!("room:superadmin", "new_msg", %{uid: uid, body: body})
redirect(conn, to: "/")
end
Terminate
On termination, the channel callback terminate/2
will be invoked with
the error reason and the socket.
If we are terminating because the client left, the reason will be
{:shutdown, :left}
. Similarly, if we are terminating because the
client connection was closed, the reason will be {:shutdown, :closed}
.
If any of the callbacks return a :stop
tuple, it will also
trigger terminate with the reason given in the tuple.
terminate/2
, however, won't be invoked in case of errors nor in
case of exits. This is the same behaviour as you find in Elixir
abstractions like GenServer
and others. Similar to GenServer
,
it would also be possible :trap_exit
to guarantee that terminate/2
is invoked. This practice is not encouraged though.
Typically speaking, if you want to clean something up, it is better to
monitor your channel process and do the clean up from another process.
All channel callbacks including join/3
are called from within the
channel process. Therefore, self()
in any of them returns the PID to
be monitored.
Exit reasons when stopping a channel
When the channel callbacks return a :stop
tuple, such as:
{:stop, :shutdown, socket}
{:stop, {:error, :enoent}, socket}
the second argument is the exit reason, which follows the same behaviour as
standard GenServer
exits.
You have three options to choose from when shutting down a channel:
:normal
- in such cases, the exit won't be logged and linked processes do not exit:shutdown
or{:shutdown, term}
- in such cases, the exit won't be logged and linked processes exit with the same reason unless they're trapping exitsany other term - in such cases, the exit will be logged and linked processes exit with the same reason unless they're trapping exits
Subscribing to external topics
Sometimes you may need to programmatically subscribe a socket to external
topics in addition to the internal socket.topic
. For example,
imagine you have a bidding system where a remote client dynamically sets
preferences on products they want to receive bidding notifications on.
Instead of requiring a unique channel process and topic per
preference, a more efficient and simple approach would be to subscribe a
single channel to relevant notifications via your endpoint. For example:
defmodule MyAppWeb.Endpoint.NotificationChannel do
use Phoenix.Channel
def join("notification:" <> user_id, %{"ids" => ids}, socket) do
topics = for product_id <- ids, do: "product:#{product_id}"
{:ok, socket
|> assign(:topics, [])
|> put_new_topics(topics)}
end
def handle_in("watch", %{"product_id" => id}, socket) do
{:reply, :ok, put_new_topics(socket, ["product:#{id}"])}
end
def handle_in("unwatch", %{"product_id" => id}, socket) do
{:reply, :ok, MyAppWeb.Endpoint.unsubscribe("product:#{id}")}
end
defp put_new_topics(socket, topics) do
Enum.reduce(topics, socket, fn topic, acc ->
topics = acc.assigns.topics
if topic in topics do
acc
else
:ok = MyAppWeb.Endpoint.subscribe(topic)
assign(acc, :topics, [topic | topics])
end
end)
end
end
Note: the caller must be responsible for preventing duplicate subscriptions.
After calling subscribe/1
from your endpoint, the same flow applies to
handling regular Elixir messages within your channel. Most often, you'll
simply relay the %Phoenix.Socket.Broadcast{}
event and payload:
alias Phoenix.Socket.Broadcast
def handle_info(%Broadcast{topic: _, event: event, payload: payload}, socket) do
push(socket, event, payload)
{:noreply, socket}
end
Hibernation
From Erlang/OTP 20, channels automatically hibernate to save memory
after 15_000 milliseconds of inactivity. This can be customized by
passing the :hibernate_after
option to use Phoenix.Channel
:
use Phoenix.Channel, hibernate_after: 60_000
You can also set it to :infinity
to fully disable it.
Shutdown
You can configure the shutdown of each channel used when your application
is shutting down by setting the :shutdown
value on use:
use Phoenix.Channel, shutdown: 5_000
It defaults to 5_000.
Logging
By default, channel "join"
and "handle_in"
events are logged, using
the level :info
and :debug
, respectively. Logs can be customized per
event type or disabled by setting the :log_join
and :log_handle_in
options when using Phoenix.Channel
. For example, the following
configuration logs join events as :info
, but disables logging for
incoming events:
use Phoenix.Channel, log_join: :info, log_handle_in: false
Link to this section Summary
Callbacks
Handle regular GenServer call messages.
Handle regular GenServer cast messages.
Handle incoming event
s.
Handle regular Elixir process messages.
Intercepts outgoing event
s.
Handle channel joins by topic
.
Invoked when the channel process is about to exit.
Functions
Broadcast an event to all subscribers of the socket topic.
Same as broadcast/3
, but raises if broadcast fails.
Broadcast event from pid to all subscribers of the socket topic.
Same as broadcast_from/3
, but raises if broadcast fails.
Defines which Channel events to intercept for handle_out/3
callbacks.
Sends event to the socket.
Replies asynchronously to a socket push.
Generates a socket_ref
for an async reply.
Link to this section Types
Specs
Specs
Specs
Link to this section Callbacks
Specs
code_change(old_vsn, Phoenix.Socket.t(), extra :: term()) :: {:ok, Phoenix.Socket.t()} | {:error, reason :: term()} when old_vsn: term() | {:down, term()}
Specs
handle_call( msg :: term(), from :: {pid(), tag :: term()}, socket :: Phoenix.Socket.t() ) :: {:reply, response :: term(), Phoenix.Socket.t()} | {:noreply, Phoenix.Socket.t()} | {:stop, reason :: term(), Phoenix.Socket.t()}
Handle regular GenServer call messages.
Specs
handle_cast(msg :: term(), socket :: Phoenix.Socket.t()) :: {:noreply, Phoenix.Socket.t()} | {:stop, reason :: term(), Phoenix.Socket.t()}
Handle regular GenServer cast messages.
Specs
handle_in( event :: String.t(), payload :: payload(), socket :: Phoenix.Socket.t() ) :: {:noreply, Phoenix.Socket.t()} | {:noreply, Phoenix.Socket.t(), timeout() | :hibernate} | {:reply, reply(), Phoenix.Socket.t()} | {:stop, reason :: term(), Phoenix.Socket.t()} | {:stop, reason :: term(), reply(), Phoenix.Socket.t()}
Handle incoming event
s.
Example
def handle_in("ping", payload, socket) do
{:reply, {:ok, payload}, socket}
end
Specs
handle_info(msg :: term(), socket :: Phoenix.Socket.t()) :: {:noreply, Phoenix.Socket.t()} | {:stop, reason :: term(), Phoenix.Socket.t()}
Handle regular Elixir process messages.
Specs
handle_out( event :: String.t(), payload :: payload(), socket :: Phoenix.Socket.t() ) :: {:noreply, Phoenix.Socket.t()} | {:noreply, Phoenix.Socket.t(), timeout() | :hibernate} | {:stop, reason :: term(), Phoenix.Socket.t()}
Intercepts outgoing event
s.
See intercept/1
.
Specs
join(topic :: binary(), payload :: payload(), socket :: Phoenix.Socket.t()) :: {:ok, Phoenix.Socket.t()} | {:ok, reply :: payload(), Phoenix.Socket.t()} | {:error, reason :: map()}
Handle channel joins by topic
.
To authorize a socket, return {:ok, socket}
or {:ok, reply, socket}
. To
refuse authorization, return {:error, reason}
.
Example
def join("room:lobby", payload, socket) do
if authorized?(payload) do
{:ok, socket}
else
{:error, %{reason: "unauthorized"}}
end
end
Specs
terminate( reason :: :normal | :shutdown | {:shutdown, :left | :closed | term()}, Phoenix.Socket.t() ) :: term()
Invoked when the channel process is about to exit.
Link to this section Functions
Broadcast an event to all subscribers of the socket topic.
The event's message must be a serializable map or a tagged {:binary, data}
tuple where data
is binary data.
Examples
iex> broadcast(socket, "new_message", %{id: 1, content: "hello"})
:ok
iex> broadcast(socket, "new_message", {:binary, "hello"})
:ok
Same as broadcast/3
, but raises if broadcast fails.
Broadcast event from pid to all subscribers of the socket topic.
The channel that owns the socket will not receive the published
message. The event's message must be a serializable map or a tagged
{:binary, data}
tuple where data
is binary data.
Examples
iex> broadcast_from(socket, "new_message", %{id: 1, content: "hello"})
:ok
iex> broadcast_from(socket, "new_message", {:binary, "hello"})
:ok
Same as broadcast_from/3
, but raises if broadcast fails.
Defines which Channel events to intercept for handle_out/3
callbacks.
By default, broadcasted events are pushed directly to the client, but intercepting events gives your channel a chance to customize the event for the client to append extra information or filter the message from being delivered.
Note: intercepting events can introduce significantly more overhead if a large number of subscribers must customize a message since the broadcast will be encoded N times instead of a single shared encoding across all subscribers.
Examples
intercept ["new_msg"]
def handle_out("new_msg", payload, socket) do
push(socket, "new_msg", Map.merge(payload,
is_editable: User.can_edit_message?(socket.assigns[:user], payload)
))
{:noreply, socket}
end
handle_out/3
callbacks must return one of:
{:noreply, Socket.t} |
{:noreply, Socket.t, timeout | :hibernate} |
{:stop, reason :: term, Socket.t}
Sends event to the socket.
The event's message must be a serializable map or a tagged {:binary, data}
tuple where data
is binary data.
Examples
iex> push(socket, "new_message", %{id: 1, content: "hello"})
:ok
iex> push(socket, "new_message", {:binary, "hello"})
:ok
Specs
reply(socket_ref(), reply()) :: :ok
Replies asynchronously to a socket push.
Useful when you need to reply to a push that can't otherwise be handled using
the {:reply, {status, payload}, socket}
return from your handle_in
callbacks. reply/2
will be used in the rare cases you need to perform work in
another process and reply when finished by generating a reference to the push
with socket_ref/1
.
Note: In such cases, a socket_ref
should be generated and
passed to the external process, so the socket
itself is not leaked outside
the channel. The socket
holds information such as assigns and transport
configuration, so it's important to not copy this information outside of the
channel that owns it.
Examples
def handle_in("work", payload, socket) do
Worker.perform(payload, socket_ref(socket))
{:noreply, socket}
end
def handle_info({:work_complete, result, ref}, socket) do
reply(ref, {:ok, result})
{:noreply, socket}
end
Specs
socket_ref(Phoenix.Socket.t()) :: socket_ref()
Generates a socket_ref
for an async reply.
See reply/2
for example usage.