# `Vdr.RedisStream.Replica`
[🔗](https://github.com/savonarola/veidrodelis/blob/v0.1.6/lib/veidrodelis/redis_stream/replica.ex#L1)

Redis replication client that connects to a Redis and receives
replication stream via PSYNC.

The replica manages a state machine for the replication protocol:
1. Connect to Redis (TCP/SSL) or discover via Sentinel
2. Send PING
3. Authenticate (if password provided)
4. Negotiate PSYNC
5. Receive and parse RDB snapshot
6. Receive stream of commands

Replica is parametrized by a callback module that implements the `Vdr.RedisStream.Callback` behaviour.

This module is a part of public API to allow users to implement their own replication handlers.
However, be aware that the RDB snapshot parser currently skips data entries that are not
related to string, list, set, sorted set or hash data types.

In this project, this module is used with `Vdr.TSProj` callback module that builds an in-memory
projection of the Redis data related to the string, list, set, sorted set or hash data types.

### Simple Logging Replica

Example callback module that logs all replicated commands

```elixir
defmodule LoggingCallback do
  @behaviour Vdr.RedisStream.Callback
  require Logger

  @impl Vdr.RedisStream.Callback
  def init(_opts) do
    {:ok, %{}}
  end

  @impl Vdr.RedisStream.Callback
  def handle_replication_start(state) do
    Logger.info("Replication started")
    {:ok, state}
  end

  @impl Vdr.RedisStream.Callback
  def handle_streaming_start(state) do
    Logger.info("Command streaming started")
    {:ok, state}
  end

  @impl Vdr.RedisStream.Callback
  def handle_commands(state, replica_commands) do
    # Log each command as it arrives
    Enum.each(replica_commands, fn cmd ->
      Logger.debug("Received command: db=#{cmd.db} cmd=#{inspect(cmd.command)}")
    end)

    {:ok, state}
  end

  @impl Vdr.RedisStream.Callback
  def handle_call(state, _message) do
    {:reply, :ok, state}
  end

  @impl Vdr.RedisStream.Callback
  def handle_info(state, _message) do
    {:noreply, state}
  end

  @impl Vdr.RedisStream.Callback
  def handle_destroy(_state) do
    Logger.info("Replica shutting down")
    :ok
  end
end

# Start the logging replica
{:ok, replica} = Vdr.RedisStream.Replica.start_link(
  host: "localhost",
  port: 6379,
  callback_module: LoggingCallback,
  callback_opts: %{}
)
```

# `replica_state`

```elixir
@type replica_state() ::
  :init
  | :ping
  | :auth
  | :replconf_listening_port
  | :replconf_capa
  | :psync
  | :rdb_transfer
  | :streaming
```

# `call`

```elixir
@spec call(GenServer.server(), term(), non_neg_integer()) ::
  {:ok, term()} | {:error, term()}
```

Make a synchronous call to the replica's callback module.

This will invoke the callback module's `handle_call/2` function with the provided
message, allowing you to query or interact with the callback state.

The call will only succeed if the replica is in a valid state (after replication
has started but before termination). If called during initialization or after
disconnection, it will return `{:error, :not_connected}`.

## Parameters

  * `server` - The replica GenServer PID or name
  * `message` - The message to pass to the callback's `handle_call/2`
  * `timeout` - The timeout for the call (default: 5000ms)

## Returns

  * `{:ok, reply}` - Success with reply from callback
  * `{:error, :not_implemented}` - Callback doesn't implement `handle_call/2`
  * `{:error, :not_connected}` - Replica not in valid state
  * `{:error, reason}` - Other error from callback

# `child_spec`

```elixir
@spec child_spec(keyword()) :: Supervisor.child_spec()
```

Creates a child specification for the replica for running it under a supervisor.

# `connected_to`

```elixir
@spec connected_to(GenServer.server()) ::
  {:ok, {String.t(), non_neg_integer()}} | {:error, :not_connected}
```

Get the host and port of the currently connected Redis server.

Returns the actual host and port that the replica is connected to. For sentinel-based
connections, this returns the discovered server address. For direct connections, this
returns the configured host and port.

## Returns

  * `{:ok, {host, port}}` - The connected host (string) and port (integer)
  * `{:error, :not_connected}` - Replica is not currently connected

# `get_callback_state`

```elixir
@spec get_callback_state(GenServer.server()) :: term()
```

Get the current callback state.

# `get_offset`

```elixir
@spec get_offset(GenServer.server()) :: integer()
```

Get the current replication offset.

# `get_replication_id`

```elixir
@spec get_replication_id(GenServer.server()) :: binary() | nil
```

Get the current replication ID.

# `get_replication_state`

```elixir
@spec get_replication_state(GenServer.server()) :: replica_state()
```

Get the current replication state.

# `start_link`

```elixir
@spec start_link(keyword()) :: GenServer.on_start()
```

Start a Redis replica client.

## Options

### Connection Options (mutually exclusive)

  * `:host` - Redis host (default: "localhost"). Cannot be used with `:sentinel`.
  * `:port` - Redis port (default: 6379). Cannot be used with `:sentinel`.
  * `:sentinel` - Sentinel configuration (keyword list). Cannot be used with `:host`/`:port`.
    Requires `:redix` dependency at runtime.
    * `:sentinels` - List of sentinel nodes (required), each with `:host` and `:port`
    * `:group` - Name of the primary group in sentinel (required)
    * `:role` - Server role to discover: `:primary` or `:replica` (default: `:primary`)
    * `:connect_opts` - Redix connection options for sentinel connections (optional).
      Supports all Redix options like `:timeout`, `:ssl`, `:password`, etc.
    * `:replica_connect_opts` - Redix connection options for discovered Redis server (optional).
      Supports all Redix options like `:username`, `:password`, `:ssl`, `:socket_opts`, etc.
    * `:host_map` - Mapping for translating sentinel-returned hostnames (helpful for testing).
      Can be a map or a function that takes a hostname and returns a new hostname.

### Redis Server Options (apply to direct connections only)

  * `:username` - Redis username for ACL authentication (default: nil). Not used with sentinel.
  * `:password` - Redis password (default: nil). Not used with sentinel.
  * `:ssl` - Use SSL/TLS for Redis connection (default: false). Not used with sentinel.
  * `:ssl_opts` - SSL options (default: []). Not used with sentinel.

### Callback Options

  * `:callback_module` - Module implementing `Vdr.RedisStream.Callback` (required)
  * `:callback_opts` - Options for the callback module (required)

### Other Options

  * `:name` - GenServer name (optional)
  * `:reconnect` - Enable automatic reconnection (default: true)
  * `:reconnect_delay_ms` - Initial delay before reconnection in ms (default: 1000)
  * `:max_reconnect_delay_ms` - Maximum delay between reconnection attempts in ms (default: 30000)
  * `:ack_interval_ms` - Interval for sending periodic REPLCONF ACK to the primary in ms (default: 1000).
  * `:command_filter` - Command filter to apply to commands (default: none)

## Authentication

For Redis 6+ ACL authentication, provide both `:username` and `:password`.
For older Redis versions, provide only `:password`.

## Sentinel Support

When using Redis Sentinel for high availability, provide the `:sentinel` option instead of
`:host` and `:port`. The replica will:

1. Query sentinels sequentially to discover the primary (or replica) address
2. Connect to the discovered Redis server
3. Verify the server role matches the expected role
4. On reconnection, repeat the discovery process (automatically handling failovers)

The `:role` option determines which type of server to discover:
- `:primary` - Connect to the primary server (typical for replication)
- `:replica` - Connect to a replica server (for read-only replication)

## Reconnection

When enabled, the replica will automatically attempt to reconnect on connection failures
or disconnects. It will use exponential backoff starting from `:reconnect_delay_ms` up to
`:max_reconnect_delay_ms`. The replica will attempt partial resync (PSYNC) when possible
to avoid full RDB transfer.

When using Sentinel, the reconnection process includes rediscovery, allowing the replica
to automatically adapt to failovers and topology changes.

## Examples

### Direct Connection

Basic connection
```elixir
opts = [
  host: "localhost",
  port: 6379,
  callback_module: MyCallback,
  callback_opts: %{}
]
{:ok, replica} = Vdr.RedisStream.Replica.start_link(opts)
```

With ACL authentication
```elixir
opts = [
  host: "localhost",
  port: 6379,
  username: "myuser",
  password: "mypassword",
  callback_module: MyCallback,
  callback_opts: %{}
]
{:ok, replica} = Vdr.RedisStream.Replica.start_link(opts)
```

### Sentinel Connection

Connect to primary via sentinel
```elixir
opts = [
  sentinel: [
    sentinels: [
      [host: "sentinel1", port: 26379],
      [host: "sentinel2", port: 26379],
      [host: "sentinel3", port: 26379]
    ],
    group: "myprimary",
    role: :primary,
    # Optional: Connection options for sentinel connections
    connect_opts: [timeout: 1000, ssl: true],
    # Optional: Connection options for replica connections
    replica_connect_opts: [password: "redis_password", ssl: true]
  ],
  callback_module: MyCallback,
  callback_opts: %{}
]
{:ok, replica} = Vdr.RedisStream.Replica.start_link(opts)
```

Connect to replica via sentinel
```elixir
opts = [
  sentinel: [
    sentinels: [
      [host: "sentinel1", port: 26379],
      [host: "sentinel2", port: 26379],
      [host: "sentinel3", port: 26379]
    ],
    group: "myprimary",
    role: :replica,
    replica_connect_opts: [password: "redis_password"]
  ],
  callback_module: MyCallback,
  callback_opts: %{}
]
{:ok, replica} = Vdr.RedisStream.Replica.start_link(opts)
```

## Returns

  * `{:ok, pid}` - Successfully started
  * `{:error, reason}` - Failed to start

# `stop`

```elixir
@spec stop(GenServer.server()) :: :ok
```

Stop the replica client.

---

*Consult [api-reference.md](api-reference.md) for complete listing*
