View Source Redix.PubSub (Redix v1.5.2)
Interface for the Redis pub/sub functionality.
The rest of this documentation will assume the reader knows how pub/sub works in Redis and knows the meaning of the following Redis commands:
SUBSCRIBE
andUNSUBSCRIBE
PSUBSCRIBE
andPUNSUBSCRIBE
PUBLISH
Usage
Each Redix.PubSub
process is able to subscribe to/unsubscribe from multiple
Redis channels/patterns, and is able to handle multiple Elixir processes subscribing
each to different channels/patterns.
A Redix.PubSub
process can be started via Redix.PubSub.start_link/2
; such
a process holds a single TCP (or SSL) connection to the Redis server.
Redix.PubSub
has a message-oriented API. Subscribe operations are synchronous and return
a reference that can then be used to match on all messages sent by the Redix.PubSub
process.
When Redix.PubSub
registers a subscriptions, the subscriber process will receive a
confirmation message:
{:ok, pubsub} = Redix.PubSub.start_link()
{:ok, ref} = Redix.PubSub.subscribe(pubsub, "my_channel", self())
receive do message -> message end
#=> {:redix_pubsub, ^pubsub, ^ref, :subscribed, %{channel: "my_channel"}}
When the :subscribed
message is received, it's guaranteed that the Redix.PubSub
process has
subscribed to the given channel. This means that after a subscription, messages published to
a channel are delivered to all Elixir processes subscribed to that channel via Redix.PubSub
:
# Someone publishes "hello" on "my_channel"
receive do message -> message end
#=> {:redix_pubsub, ^pubsub, ^ref, :message, %{channel: "my_channel", payload: "hello"}}
It's advised to wait for the subscription confirmation for a channel before doing any other operation involving that channel.
Note that unsubscription confirmations are delivered right away even if the Redix.PubSub
process is still subscribed to the given channel: this is by design, as once a process
is unsubscribed from a channel it won't receive messages anyways, even if the Redix.PubSub
process still receives them.
Messages are also delivered as a confirmation of an unsubscription as well as when the
Redix.PubSub
connection goes down. See the "Messages" section below.
Messages
Most of the communication with a PubSub connection is done via (Elixir) messages: the
subscribers of these messages will be the processes specified at subscription time (in
subscribe/3
or psubscribe/3
). All Redix.PubSub
messages have the same form: they're a
five-element tuple that looks like this:
{:redix_pubsub, pubsub_pid, subscription_ref, message_type, message_properties}
where:
pubsub_pid
is the pid of theRedix.PubSub
process that sent this message.subscription_ref
is the reference returned bysubscribe/3
orpsubscribe/3
.message_type
is the type of this message, such as:subscribed
for subscription confirmations,:message
for pub/sub messages, and so on.message_properties
is a map of data related to that that varies based onmessage_type
.
Given this format, it's easy to match on all Redix pub/sub messages for a subscription
as {:redix_pubsub, _, ^subscription_ref, _, _}
.
List of possible message types and properties
The following is a comprehensive list of possible message types alongside the properties that each can have.
:subscribe
- sent as confirmation of subscription to a channel (viasubscribe/3
or after a disconnection and reconnection). One:subscribe
message is received for every channel a process subscribed to.:subscribe
messages have the following properties::channel
- the channel the process has been subscribed to.
:psubscribe
- sent as confirmation of subscription to a pattern (viapsubscribe/3
or after a disconnection and reconnection). One:psubscribe
message is received for every pattern a process subscribed to.:psubscribe
messages have the following properties::pattern
- the pattern the process has been subscribed to.
:unsubscribe
- sent as confirmation of unsubscription from a channel (viaunsubscribe/3
).:unsubscribe
messages are received for every channel a process unsubscribes from.:unsubscribe
messages havethe following properties::channel
- the channel the process has unsubscribed from.
:punsubscribe
- sent as confirmation of unsubscription from a pattern (viaunsubscribe/3
).:unsubscribe
messages are received for every pattern a process unsubscribes from.:unsubscribe
messages havethe following properties::pattern
- the pattern the process has unsubscribed from.
:message
- sent to subscribers to a given channel when a message is published on that channel.:message
messages have the following properties::channel
- the channel the message was published on:payload
- the contents of the message
:pmessage
- sent to subscribers to a given pattern when a message is published on a channel that matches that pattern.:pmessage
messages have the following properties::channel
- the channel the message was published on:pattern
- the original pattern that matched the channel:payload
- the contents of the message
:disconnected
messages - sent to all subscribers to all channels/patterns when the connection to Redis is interrupted.:disconnected
messages have the following properties::error
- the reason for the disconnection, aRedix.ConnectionError
exception struct (that can be raised or turned into a message throughException.message/1
).
Reconnections
Redix.PubSub
tries to be resilient to failures: when the connection with
Redis is interrupted (for whatever reason), it will try to reconnect to the
Redis server. When a disconnection happens, Redix.PubSub
will notify all
clients subscribed to all channels with a {:redix_pubsub, pid, subscription_ref, :disconnected, _}
message (more on the format of messages above). When the connection goes
back up, Redix.PubSub
takes care of actually re-subscribing to the
appropriate channels on the Redis server and subscribers are notified with a
{:redix_pubsub, pid, subscription_ref, :subscribed | :psubscribed, _}
message, the same as
when a client subscribes to a channel/pattern.
Note that if exit_on_disconnection: true
is passed to
Redix.PubSub.start_link/2
, the Redix.PubSub
process will exit and not send
any :disconnected
messages to subscribed clients.
Sentinel support
Works exactly the same as for normal Redix
connections. See the documentation for Redix
for more information.
Examples
This is an example of a workflow using the PubSub functionality; it uses Redix as a Redis client for publishing messages.
{:ok, pubsub} = Redix.PubSub.start_link()
{:ok, client} = Redix.start_link()
Redix.PubSub.subscribe(pubsub, "my_channel", self())
#=> {:ok, ref}
# We wait for the subscription confirmation
receive do
{:redix_pubsub, ^pubsub, ^ref, :subscribed, %{channel: "my_channel"}} -> :ok
end
Redix.command!(client, ~w(PUBLISH my_channel hello)
receive do
{:redix_pubsub, ^pubsub, ^ref, :message, %{channel: "my_channel"} = properties} ->
properties.payload
end
#=> "hello"
Redix.PubSub.unsubscribe(pubsub, "foo", self())
#=> :ok
# We wait for the unsubscription confirmation
receive do
{:redix_pubsub, ^pubsub, ^ref, :unsubscribed, _} -> :ok
end
Summary
Functions
Gets the Redis CLIENT ID
associated with a connection.
Subscribes subscriber
to the given pattern or list of patterns.
Unsubscribes subscriber
from the given pattern or list of patterns.
Starts a pub/sub connection to Redis.
Same as start_link/1
but using both a Redis URI and a list of options.
Stops the given pub/sub process.
Subscribes subscriber
to the given channel or list of channels.
Unsubscribes subscriber
from the given channel or list of channels.
Types
@type connection() :: GenServer.server()
Functions
@spec get_client_id(connection()) :: {:ok, integer()} | {:error, Redix.ConnectionError.t()}
Gets the Redis CLIENT ID
associated with a connection.
This is useful for implementing client-side caching, where you can subscribe your pub/sub connection to changes on keys.
If the pub/sub connection is currently disconnected, this function returns
{:error, error}
.
This function requires the Redix.PubSub
connection to have been started
with the fetch_client_id_on_connect: true
option. This requires
Redis 5.0.0 or later, since that's where the
CLIENT ID
command
was introduced.
Examples
iex> Redix.PubSub.get_client_id(conn)
{:ok, 123}
If the connection is not currently connected:
iex> Redix.PubSub.get_client_id(conn)
{:error, %Redix.ConnectionError{reason: :disconnected}
If the connection was not storing the client ID:
iex> Redix.PubSub.get_client_id(conn)
{:error, %Redix.ConnectionError{reason: :client_id_not_stored}
@spec psubscribe(connection(), String.t() | [String.t()], subscriber()) :: {:ok, reference()}
Subscribes subscriber
to the given pattern or list of patterns.
Works like subscribe/3
but subscribing subscriber
to a pattern (or list of
patterns) instead of regular channels.
Upon successful subscription to each of the patterns
, a message will be sent
to subscriber
with the following form:
{:redix_pubsub, pid, ^subscription_ref, :psubscribed, %{pattern: pattern}}
See the documentation for Redix.PubSub
for more information about the format
of messages.
Examples
iex> Redix.psubscribe(conn, "ba*", self())
:ok
iex> flush()
{:redix_pubsub, ^conn, ^subscription_ref, :psubscribe, %{pattern: "ba*"}}
:ok
@spec punsubscribe(connection(), String.t() | [String.t()], subscriber()) :: :ok
Unsubscribes subscriber
from the given pattern or list of patterns.
This function basically "undoes" what psubscribe/3
does: it unsubscribes
subscriber
from the given pattern or list of patterns.
Upon successful unsubscription from each of the patterns
, a message will be
sent to subscriber
with the following form:
{:redix_pubsub, pid, ^subscription_ref, :punsubscribed, %{pattern: pattern}}
See the documentation for Redix.PubSub
for more information about the format
of messages.
Examples
iex> Redix.punsubscribe(conn, "foo_*", self())
:ok
iex> flush()
{:redix_pubsub, ^conn, ^subscription_ref, :punsubscribed, %{pattern: "foo_*"}}
:ok
Starts a pub/sub connection to Redis.
This function returns {:ok, pid}
if the PubSub process is started successfully.
The actual TCP/SSL connection to the Redis server may happen either synchronously,
before start_link/2
returns, or asynchronously: this behaviour is decided by
the :sync_connect
option (see below).
This function accepts one argument, either a Redis URI as a string or a list of options.
Redis URI
In case uri_or_opts
is a Redis URI, it must be in the form:
redis://[:password@]host[:port][/db]
Here are some examples of valid URIs:
redis://localhost
redis://:secret@localhost:6397
redis://username:secret@localhost:6397
redis://example.com:6380/1
The only mandatory thing when using URIs is the host. All other elements are optional and their default value can be found in the "Options" section below.
In earlier versions of Redix, the username in the URI was ignored. Redis 6 introduced ACL support. Now, Redix supports usernames as well.
Options
The following options can be used to specify the connection:
:host
(String.t/0
) - the host where the Redis server is running. If you are using a Redis URI, you cannot use this option. Defaults to"localhost
".:port
(non_neg_integer/0
) - the port on which the Redis server is running. If you are using a Redis URI, you cannot use this option. Defaults to6379
.:database
(String.t/0
ornon_neg_integer/0
) - the database to connect to. Defaults tonil
, meaning Redix doesn't connect to a specific database (the default in this case is database0
). When this option is provided, all Redix does is issue aSELECT
command to Redis in order to select the given database.:username
- the username to connect to Redis. Defaults tonil
, meaning no username is used. Redis supports usernames only since Redis 6 (see the ACL documentation). If a username is provided (either via options or via URIs) and the Redis version used doesn't support ACL, then Redix falls back to using just the password and emits a warning. In future Redix versions, Redix will raise if a username is passed and the Redis version used doesn't support ACL.:password
(Redix.password/0
) - the password used to connect to Redis. Defaults tonil
, meaning no password is used. When this option is provided, all Redix does is issue anAUTH
command to Redis in order to authenticate. MFAs are also supported in the form of{module, function, arguments}
. This can be used to fetch the password dynamically on every reconnection but most importantly to hide the password from crash reports in case the Redix connection crashes for any reason. For example, you can set this option to:{System, :fetch_env!, ["REDIX_PASSWORD"]}
.:timeout
(timeout/0
) - connection timeout (in milliseconds) directly passed to the network layer. The default value is5000
.:sync_connect
(boolean/0
) - decides whether Redix should initiate the network connection to the Redis server before or after returning fromstart_link/1
. This option also changes some reconnection semantics; read the "Reconnections" page in the documentation for more information. The default value isfalse
.:exit_on_disconnection
(boolean/0
) - iftrue
, the Redix server will exit if it fails to connect or disconnects from Redis. Note that setting this option totrue
means that the:backoff_initial
and:backoff_max
options will be ignored. The default value isfalse
.:backoff_initial
(non_neg_integer/0
) - the initial backoff time (in milliseconds), which is the time that the Redix process will wait before attempting to reconnect to Redis after a disconnection or failed first connection. See the "Reconnections" page in the docs for more information. The default value is500
.:backoff_max
(timeout/0
) - the maximum length (in milliseconds) of the time interval used between reconnection attempts. See the "Reconnections" page in the docs for more information. The default value is30000
.:ssl
(boolean/0
) - iftrue
, connect through SSL, otherwise through TCP. The:socket_opts
option applies to both SSL and TCP, so it can be used for things like certificates. See:ssl.connect/4
. The default value isfalse
.:name
(term/0
) - Redix is bound to the same registration rules as aGenServer
. See theGenServer
documentation for more information.:socket_opts
(list ofterm/0
) - specifies a list of options that are passed to the network layer when connecting to the Redis server. Some socket options (like:active
or:binary
) will be overridden by Redix so that it functions properly.If
ssl: true
, then these are added to the default:[verify: :verify_peer, depth: 3]
. If theCAStore
dependency is available, the:cacertfile
option is added to the SSL options by default as well.The default value is
[]
.:hibernate_after
(non_neg_integer/0
) - if present, the Redix connection process awaits any message for the given number of milliseconds and if no message is received, the process goes into hibernation automatically (by calling:proc_lib.hibernate/3
). See:gen_statem.start_opt/0
. Not present by default.:spawn_opt
(keyword/0
) - if present, its value is passed as options to the Redix connection process as inProcess.spawn/4
. See:gen_statem.start_opt/0
. Not present by default.:debug
(keyword/0
) - if present, the corresponding function in the:sys
module is invoked.:fetch_client_id_on_connect
(boolean/0
) - iftrue
, Redix will fetch the client ID after connecting to Redis and before subscribing to any topic. You can then read the client ID of the pub/sub connection withget_client_id/1
. This option uses theCLIENT ID
command under the hood, which is available since Redis 5.0.0. This option is available since v1.4.1. The default value isfalse
.:sentinel
(keyword/0
) - options to use Redis Sentinel. If this option is present, you cannot use the:host
and:port
options. See the Sentinel Options section below.
Sentinel Options
:sentinels
(list ofString.t/0
orkeyword/0
) - Required. a list of sentinel addresses. Each element in this list is the address of a sentinel to be contacted in order to obtain the address of a primary. The address of a sentinel can be passed as a Redis URI (see the "Using a Redis URI" section) or a keyword list with:host
,:port
,:password
options (same as when connecting to a Redis instance directly). Note that the password can either be passed in the sentinel address or globally — see the:password
option below.:group
(String.t/0
) - Required. the name of the group that identifies the primary in the sentinel configuration.:role
(Redix.sentinel_role/0
) - if:primary
, the connection will be established with the primary for the given group. If:replica
, Redix will ask the sentinel for all the available replicas for the given group and try to connect to one of them at random. The default value is:primary
.:socket_opts
(keyword/0
) - socket options for connecting to each sentinel. Same as the:socket_opts
option described above. The default value is[]
.:timeout
(timeout/0
) - the timeout (in milliseconds or:infinity
) that will be used to interact with the sentinels. This timeout will be used as the timeout when connecting to each sentinel and when asking sentinels for a primary. The Redis documentation suggests to keep this timeout short so that connection to Redis can happen quickly. The default value is500
.:ssl
(boolean/0
) - whether to use SSL to connect to each sentinel. The default value isfalse
.:password
(Redix.password/0
) - if you don't want to specify a password for each sentinel you list, you can use this option to specify a password that will be used to authenticate on sentinels if they don't specify a password. This option is recommended over passing a password for each sentinel because in the future we might do sentinel auto-discovery, which means authentication can only be done through a global password that works for all sentinels.
Examples
iex> Redix.PubSub.start_link()
{:ok, #PID<...>}
iex> Redix.PubSub.start_link(host: "example.com", port: 9999, password: "secret")
{:ok, #PID<...>}
iex> Redix.PubSub.start_link([database: 3], [name: :redix_3])
{:ok, #PID<...>}
Same as start_link/1
but using both a Redis URI and a list of options.
In this case, options specified in opts
have precedence over values specified by uri
.
For example, if uri
is redix://example1.com
but opts
is [host: "example2.com"]
, then
example2.com
will be used as the host when connecting.
Stops the given pub/sub process.
This function is synchronous and blocks until the given pub/sub connection
frees all its resources and disconnects from the Redis server. timeout
can
be passed to limit the amount of time allowed for the connection to exit; if
it doesn't exit in the given interval, this call exits.
Examples
iex> Redix.PubSub.stop(conn)
:ok
@spec subscribe(connection(), String.t() | [String.t()], subscriber()) :: {:ok, reference()}
Subscribes subscriber
to the given channel or list of channels.
Subscribes subscriber
(which can be anything that can be passed to send/2
)
to channels
, which can be a single channel or a list of channels.
For each of the channels in channels
which subscriber
successfully
subscribes to, a message will be sent to subscriber
with this form:
{:redix_pubsub, pid, subscription_ref, :subscribed, %{channel: channel}}
See the documentation for Redix.PubSub
for more information about the format
of messages.
Examples
iex> Redix.PubSub.subscribe(conn, ["foo", "bar"], self())
{:ok, subscription_ref}
iex> flush()
{:redix_pubsub, ^conn, ^subscription_ref, :subscribed, %{channel: "foo"}}
{:redix_pubsub, ^conn, ^subscription_ref, :subscribed, %{channel: "bar"}}
:ok
@spec unsubscribe(connection(), String.t() | [String.t()], subscriber()) :: :ok
Unsubscribes subscriber
from the given channel or list of channels.
This function basically "undoes" what subscribe/3
does: it unsubscribes
subscriber
from the given channel or list of channels.
Upon successful unsubscription from each of the channels
, a message will be
sent to subscriber
with the following form:
{:redix_pubsub, pid, ^subscription_ref, :unsubscribed, %{channel: channel}}
See the documentation for Redix.PubSub
for more information about the format
of messages.
Examples
iex> Redix.unsubscribe(conn, ["foo", "bar"], self())
:ok
iex> flush()
{:redix_pubsub, ^conn, ^subscription_ref, :unsubscribed, %{channel: "foo"}}
{:redix_pubsub, ^conn, ^subscription_ref, :unsubscribed, %{channel: "bar"}}
:ok