Vdr.RedisStream.Replica (veidrodelis v0.1.6)

Copy Markdown View Source

Redis replication client that connects to a Redis and receives replication stream via PSYNC.

The replica manages a state machine for the replication protocol:

  1. Connect to Redis (TCP/SSL) or discover via Sentinel
  2. Send PING
  3. Authenticate (if password provided)
  4. Negotiate PSYNC
  5. Receive and parse RDB snapshot
  6. Receive stream of commands

Replica is parametrized by a callback module that implements the Vdr.RedisStream.Callback behaviour.

This module is a part of public API to allow users to implement their own replication handlers. However, be aware that the RDB snapshot parser currently skips data entries that are not related to string, list, set, sorted set or hash data types.

In this project, this module is used with Vdr.TSProj callback module that builds an in-memory projection of the Redis data related to the string, list, set, sorted set or hash data types.

Simple Logging Replica

Example callback module that logs all replicated commands

defmodule LoggingCallback do
  @behaviour Vdr.RedisStream.Callback
  require Logger

  @impl Vdr.RedisStream.Callback
  def init(_opts) do
    {:ok, %{}}
  end

  @impl Vdr.RedisStream.Callback
  def handle_replication_start(state) do
    Logger.info("Replication started")
    {:ok, state}
  end

  @impl Vdr.RedisStream.Callback
  def handle_streaming_start(state) do
    Logger.info("Command streaming started")
    {:ok, state}
  end

  @impl Vdr.RedisStream.Callback
  def handle_commands(state, replica_commands) do
    # Log each command as it arrives
    Enum.each(replica_commands, fn cmd ->
      Logger.debug("Received command: db=#{cmd.db} cmd=#{inspect(cmd.command)}")
    end)

    {:ok, state}
  end

  @impl Vdr.RedisStream.Callback
  def handle_call(state, _message) do
    {:reply, :ok, state}
  end

  @impl Vdr.RedisStream.Callback
  def handle_info(state, _message) do
    {:noreply, state}
  end

  @impl Vdr.RedisStream.Callback
  def handle_destroy(_state) do
    Logger.info("Replica shutting down")
    :ok
  end
end

# Start the logging replica
{:ok, replica} = Vdr.RedisStream.Replica.start_link(
  host: "localhost",
  port: 6379,
  callback_module: LoggingCallback,
  callback_opts: %{}
)

Summary

Functions

Make a synchronous call to the replica's callback module.

Creates a child specification for the replica for running it under a supervisor.

Get the host and port of the currently connected Redis server.

Get the current callback state.

Get the current replication offset.

Get the current replication ID.

Get the current replication state.

Start a Redis replica client.

Stop the replica client.

Types

replica_state()

@type replica_state() ::
  :init
  | :ping
  | :auth
  | :replconf_listening_port
  | :replconf_capa
  | :psync
  | :rdb_transfer
  | :streaming

Functions

call(server, message, timeout \\ 5000)

@spec call(GenServer.server(), term(), non_neg_integer()) ::
  {:ok, term()} | {:error, term()}

Make a synchronous call to the replica's callback module.

This will invoke the callback module's handle_call/2 function with the provided message, allowing you to query or interact with the callback state.

The call will only succeed if the replica is in a valid state (after replication has started but before termination). If called during initialization or after disconnection, it will return {:error, :not_connected}.

Parameters

  • server - The replica GenServer PID or name
  • message - The message to pass to the callback's handle_call/2
  • timeout - The timeout for the call (default: 5000ms)

Returns

  • {:ok, reply} - Success with reply from callback
  • {:error, :not_implemented} - Callback doesn't implement handle_call/2
  • {:error, :not_connected} - Replica not in valid state
  • {:error, reason} - Other error from callback

child_spec(init_arg)

@spec child_spec(keyword()) :: Supervisor.child_spec()

Creates a child specification for the replica for running it under a supervisor.

connected_to(server)

@spec connected_to(GenServer.server()) ::
  {:ok, {String.t(), non_neg_integer()}} | {:error, :not_connected}

Get the host and port of the currently connected Redis server.

Returns the actual host and port that the replica is connected to. For sentinel-based connections, this returns the discovered server address. For direct connections, this returns the configured host and port.

Returns

  • {:ok, {host, port}} - The connected host (string) and port (integer)
  • {:error, :not_connected} - Replica is not currently connected

get_callback_state(server)

@spec get_callback_state(GenServer.server()) :: term()

Get the current callback state.

get_offset(server)

@spec get_offset(GenServer.server()) :: integer()

Get the current replication offset.

get_replication_id(server)

@spec get_replication_id(GenServer.server()) :: binary() | nil

Get the current replication ID.

get_replication_state(server)

@spec get_replication_state(GenServer.server()) :: replica_state()

Get the current replication state.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Start a Redis replica client.

Options

Connection Options (mutually exclusive)

  • :host - Redis host (default: "localhost"). Cannot be used with :sentinel.
  • :port - Redis port (default: 6379). Cannot be used with :sentinel.
  • :sentinel - Sentinel configuration (keyword list). Cannot be used with :host/:port. Requires :redix dependency at runtime.
    • :sentinels - List of sentinel nodes (required), each with :host and :port
    • :group - Name of the primary group in sentinel (required)
    • :role - Server role to discover: :primary or :replica (default: :primary)
    • :connect_opts - Redix connection options for sentinel connections (optional). Supports all Redix options like :timeout, :ssl, :password, etc.
    • :replica_connect_opts - Redix connection options for discovered Redis server (optional). Supports all Redix options like :username, :password, :ssl, :socket_opts, etc.
    • :host_map - Mapping for translating sentinel-returned hostnames (helpful for testing). Can be a map or a function that takes a hostname and returns a new hostname.

Redis Server Options (apply to direct connections only)

  • :username - Redis username for ACL authentication (default: nil). Not used with sentinel.
  • :password - Redis password (default: nil). Not used with sentinel.
  • :ssl - Use SSL/TLS for Redis connection (default: false). Not used with sentinel.
  • :ssl_opts - SSL options (default: []). Not used with sentinel.

Callback Options

  • :callback_module - Module implementing Vdr.RedisStream.Callback (required)
  • :callback_opts - Options for the callback module (required)

Other Options

  • :name - GenServer name (optional)
  • :reconnect - Enable automatic reconnection (default: true)
  • :reconnect_delay_ms - Initial delay before reconnection in ms (default: 1000)
  • :max_reconnect_delay_ms - Maximum delay between reconnection attempts in ms (default: 30000)
  • :ack_interval_ms - Interval for sending periodic REPLCONF ACK to the primary in ms (default: 1000).
  • :command_filter - Command filter to apply to commands (default: none)

Authentication

For Redis 6+ ACL authentication, provide both :username and :password. For older Redis versions, provide only :password.

Sentinel Support

When using Redis Sentinel for high availability, provide the :sentinel option instead of :host and :port. The replica will:

  1. Query sentinels sequentially to discover the primary (or replica) address
  2. Connect to the discovered Redis server
  3. Verify the server role matches the expected role
  4. On reconnection, repeat the discovery process (automatically handling failovers)

The :role option determines which type of server to discover:

  • :primary - Connect to the primary server (typical for replication)
  • :replica - Connect to a replica server (for read-only replication)

Reconnection

When enabled, the replica will automatically attempt to reconnect on connection failures or disconnects. It will use exponential backoff starting from :reconnect_delay_ms up to :max_reconnect_delay_ms. The replica will attempt partial resync (PSYNC) when possible to avoid full RDB transfer.

When using Sentinel, the reconnection process includes rediscovery, allowing the replica to automatically adapt to failovers and topology changes.

Examples

Direct Connection

Basic connection

opts = [
  host: "localhost",
  port: 6379,
  callback_module: MyCallback,
  callback_opts: %{}
]
{:ok, replica} = Vdr.RedisStream.Replica.start_link(opts)

With ACL authentication

opts = [
  host: "localhost",
  port: 6379,
  username: "myuser",
  password: "mypassword",
  callback_module: MyCallback,
  callback_opts: %{}
]
{:ok, replica} = Vdr.RedisStream.Replica.start_link(opts)

Sentinel Connection

Connect to primary via sentinel

opts = [
  sentinel: [
    sentinels: [
      [host: "sentinel1", port: 26379],
      [host: "sentinel2", port: 26379],
      [host: "sentinel3", port: 26379]
    ],
    group: "myprimary",
    role: :primary,
    # Optional: Connection options for sentinel connections
    connect_opts: [timeout: 1000, ssl: true],
    # Optional: Connection options for replica connections
    replica_connect_opts: [password: "redis_password", ssl: true]
  ],
  callback_module: MyCallback,
  callback_opts: %{}
]
{:ok, replica} = Vdr.RedisStream.Replica.start_link(opts)

Connect to replica via sentinel

opts = [
  sentinel: [
    sentinels: [
      [host: "sentinel1", port: 26379],
      [host: "sentinel2", port: 26379],
      [host: "sentinel3", port: 26379]
    ],
    group: "myprimary",
    role: :replica,
    replica_connect_opts: [password: "redis_password"]
  ],
  callback_module: MyCallback,
  callback_opts: %{}
]
{:ok, replica} = Vdr.RedisStream.Replica.start_link(opts)

Returns

  • {:ok, pid} - Successfully started
  • {:error, reason} - Failed to start

stop(server)

@spec stop(GenServer.server()) :: :ok

Stop the replica client.