View Source Module eredis_sub

Data Types

channel()

channel() = binary()

obfuscated()

obfuscated() = fun(() -> iodata())

option()

option() = {host, string() | {local, string()}} | {port, inet:port_number()} | {database, integer()} | {username, iodata() | obfuscated() | undefined} | {password, iodata() | obfuscated() | undefined} | {reconnect_sleep, reconnect_sleep()} | {connect_timeout, integer()} | {socket_options, list()} | {tls, [ssl:tls_client_option()]} | {name, registered_name()} | {sentinel, list()}

reconnect_sleep()

reconnect_sleep() = no_reconnect | integer()

registered_name()

registered_name() = {local, atom()} | {global, term()} | {via, atom(), term()}

sub_option()

sub_option() = {max_queue_size, integer() | infinity} | {queue_behaviour, drop | exit} | option()

sub_options()

sub_options() = [sub_option()]

Function Index

ack_message/1 acknowledge the receipt of a pubsub message.
channels/1 Returns the channels the given client is currentlysubscribing to.
controlling_process/1 Make the calling process the controlling process.
controlling_process/2 Make the given process (pid) the controlling process.
controlling_process/3 Make the given process (pid) the controlling process subscriber with the given Timeout.
psubscribe/2 Pattern subscribe to the given channels.
punsubscribe/2
start_link/0
start_link/1Start with options in proplist format.
start_link/3(Deprecated.)
start_link/6(Deprecated.)
stop/1
subscribe/2 Subscribe to the given channels.
unsubscribe/2

Function Details

ack_message/1

ack_message(Client::pid()) -> ok

acknowledge the receipt of a pubsub message. each pubsub message must be acknowledged before the next one is received

channels/1

channels(Client) -> any()

Returns the channels the given client is currently subscribing to. Note: this list is based on the channels at startup and any channel added during runtime. It might not immediately reflect the channels Redis thinks the client is subscribed to.

controlling_process/1

controlling_process(Client::pid()) -> ok

Make the calling process the controlling process. The controlling process received pubsub-related messages, of which there are three kinds. In each message, the pid refers to the eredis client process.

{message, Channel::binary(), Message::binary(), pid()} This is sent for each pubsub message received by the client.

{pmessage, Pattern::binary(), Channel::binary(), Message::binary(), pid()} This is sent for each pattern pubsub message received by the client.

{dropped, NumMessages::integer(), pid()} If the queue reaches the max size as specified in start_link and the behaviour is to drop messages, this message is sent when the queue is flushed.

{subscribed, Channel::binary(), pid()} When using eredis_sub:subscribe(pid()), this message will be sent for each channel Redis aknowledges the subscription. The opposite, 'unsubscribed' is sent when Redis aknowledges removal of a subscription.

{eredis_disconnected, pid()} This is sent when the eredis client is disconnected from redis.

{eredis_connected, pid()} This is sent when the eredis client reconnects to redis after an existing connection was disconnected.

Any message of the form {message, , , _} must be acknowledged before any subsequent message of the same form is sent. This prevents the controlling process from being overrun with redis pubsub messages. See ack_message/1.

controlling_process/2

controlling_process(Client::pid(), Pid::pid()) -> ok

Make the given process (pid) the controlling process.

controlling_process/3

controlling_process(Client, Pid, Timeout) -> any()

Make the given process (pid) the controlling process subscriber with the given Timeout.

psubscribe/2

psubscribe(Client::pid(), Channels::[channel()]) -> ok

Pattern subscribe to the given channels. Returns immediately. The result will be delivered to the controlling process as any other message. Delivers {subscribed, Channel::binary(), pid()}

punsubscribe/2

punsubscribe(Client::pid(), Channels::[channel()]) -> ok

start_link() -> any()

start_link(Options::sub_options()) -> {ok, Pid::pid()} | {error, Reason::term()}

Options:

{host, Host}
DNS name or IP address as string; or unix domain socket as {local, Path} (available in OTP 19+); default "127.0.0.1"
{port, Port}
Integer, default is 6379
{database, Database}
Integer; 0 for the default database
{username, Username}
A 0-ary function that returns the username (the preferred way to provide username as it prevents the actual secret from appearing in logs and stacktraces), a string or iodata or the atomundefined for no username; default undefined
{password, Password}
A 0-ary function that returns the password (the preferred way to provide password as it prevents the actual secret from appearing in logs and stacktraces), a string or iodata or the atomundefined for no username; default undefined
{reconnect_sleep, ReconnectSleep}
Integer of milliseconds to sleep between reconnect attempts; default: 100
{connect_timeout, Timeout}
Timeout value in milliseconds to use when connecting to Redis; default: 5000
{socket_options, SockOpts}
List ofgen_tcp options used when connecting the socket; default is ?SOCKET_OPTS
{tls, TlsOpts}
Enabling TLS and a list ofssl options; used when establishing a TLS connection; default is off
{name, Name}
Tuple to register the client with a name such as {local, atom()}; for all options see ServerName atgen_server:start_link/4; default: no name
{max_queue_size, N}
Queue size for incoming pubsub messages
{queue_behaviour, drop | exit}
What to do if the controlling process doesn't ack pubsub messages fast enough

.

Start with options in proplist format.

start_link(Host, Port, Password) -> any()

This function is deprecated: Use start_link/1 instead.

start_link(Host, Port, Password, ReconnectSleep, MaxQueueSize, QueueBehaviour) -> any()

This function is deprecated: Use start_link/1 instead.

stop/1

stop(Pid) -> any()

subscribe/2

subscribe(Client::pid(), Channels::[channel()]) -> ok

Subscribe to the given channels. Returns immediately. The result will be delivered to the controlling process as any other message. Delivers {subscribed, Channel::binary(), pid()}

unsubscribe/2

unsubscribe(Client::pid(), Channels::[channel()]) -> ok