Pulsar.ProducerEpochStore (Pulsar v2.8.11)

Copy Markdown View Source

Manages persistent storage of producer topic epochs across restarts.

This module provides an abstraction layer over ETS for storing and retrieving topic epochs for producers. The epochs are used to detect when a producer has been fenced by a newer producer with ExclusiveWithFencing access mode.

Each client maintains its own isolated epoch store, created when the client starts and cleaned up when the client stops.

Summary

Functions

Deletes the stored epoch for a producer.

Retrieves the stored epoch for a producer.

Initializes the epoch store for a client.

Returns the ETS table name for a given client.

Functions

delete(client_name, topic, producer_name, access_mode)

@spec delete(atom(), String.t(), String.t(), atom()) :: :ok | :error

Deletes the stored epoch for a producer.

Parameters

  • client_name - The name of the client
  • topic - The topic name
  • producer_name - The producer name
  • access_mode - The producer access mode

Returns

:ok if successful, :error if the table doesn't exist

Examples

iex> ProducerEpochStore.delete(:default, "my-topic", "my-producer", :Exclusive)
:ok

get(client_name, topic, producer_name, access_mode)

@spec get(atom(), String.t(), String.t(), atom()) :: {:ok, integer()} | :error

Retrieves the stored epoch for a producer.

Parameters

  • client_name - The name of the client
  • topic - The topic name
  • producer_name - The producer name
  • access_mode - The producer access mode

Returns

  • {:ok, epoch} if an epoch is found
  • :error if no epoch is stored

Examples

iex> ProducerEpochStore.get(:default, "my-topic", "my-producer", :Exclusive)
{:ok, 5}

iex> ProducerEpochStore.get(:default, "new-topic", "new-producer", :Shared)
:error

init(client_name)

@spec init(atom()) :: :ok

Initializes the epoch store for a client.

Creates the ETS table if it doesn't already exist. This is called when a Pulsar client starts.

Parameters

  • client_name - The name of the client

Returns

:ok

put(client_name, topic, producer_name, access_mode, epoch)

@spec put(atom(), String.t(), String.t(), atom(), integer()) :: :ok | :error

Stores the epoch for a producer.

Parameters

  • client_name - The name of the client
  • topic - The topic name
  • producer_name - The producer name
  • access_mode - The producer access mode
  • epoch - The topic epoch to store

Returns

:ok if successful, :error if the table doesn't exist

Examples

iex> ProducerEpochStore.put(:default, "my-topic", "my-producer", :Exclusive, 5)
:ok

table_name(client_name)

@spec table_name(atom()) :: atom()

Returns the ETS table name for a given client.