Vdr.TS.Watch (veidrodelis v0.1.6)

Copy Markdown View Source

Watch storage for tracking key-based subscriptions with database scoping.

Allows multiple processes to register watches for specific keys in specific databases, associating each watch with a reference value. Supports efficient lookup and deletion operations.

Internal Structure

The watch storage uses two collections:

  1. key_to_pids: %{db => %{key => %{pid => ref}}} - maps database to key to pid-to-ref mappings
  2. pid_to_keys: %{pid => MapSet.t({db, key})} - maps pids to sets of database-scoped keys

Example

watch = Vdr.TS.Watch.create()

# Add watches (with database parameter)
{:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "user:123", :my_ref)
{:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "user:456", :another_ref)

# Lookup
[{:my_ref, _pid}] = Vdr.TS.Watch.lookup(watch, 0, "user:123")

# Delete single watch
{:ok, watch, _remaining} = Vdr.TS.Watch.delete(watch, self(), 0, "user:123")

# Delete all watches for a pid
watch = Vdr.TS.Watch.delete_all(watch, self())

Summary

Functions

Adds a watch entry for the given pid, database, key, and ref.

Returns all unique {pid, ref} pairs across all watches.

Creates a new empty watch storage.

Deletes a single watch entry for the given pid, database, and key.

Deletes all watch entries for the given pid.

Looks up all watches for the given database and key.

Returns all unique {ref, pid} pairs for watches in a specific database.

Types

db_key()

@type db_key() :: {non_neg_integer(), String.t()}

t()

@type t() :: %Vdr.TS.Watch{
  key_to_pids: %{
    required(non_neg_integer()) => %{
      required(String.t()) => %{required(pid()) => term()}
    }
  },
  pid_to_keys: %{required(pid()) => MapSet.t(db_key())}
}

Functions

add(watch, pid, db, key, ref)

@spec add(t(), pid(), non_neg_integer(), String.t(), term()) ::
  {:ok, t()} | {:error, atom()}

Adds a watch entry for the given pid, database, key, and ref.

Returns {:ok, updated_watch} on success, or {:error, reason} if the key is already registered for this pid in this database.

Parameters

  • watch - The watch storage
  • pid - The process identifier
  • db - The database number
  • key - The key to watch (string)
  • ref - The reference value to associate with this watch

Examples

iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:error, :already_registered} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref2)

all_watchers(watch)

@spec all_watchers(t()) :: [{pid(), term()}]

Returns all unique {pid, ref} pairs across all watches.

This is useful for broadcasting messages to all watchers, such as sending Init messages when streaming mode starts.

Parameters

  • watch - The watch storage

Examples

iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key2", :ref2)
iex> watchers = Vdr.TS.Watch.all_watchers(watch)
iex> length(watchers) == 2
true

create()

@spec create() :: t()

Creates a new empty watch storage.

Examples

iex> watch = Vdr.TS.Watch.create()
iex> is_struct(watch, Vdr.TS.Watch)
true

delete(watch, pid, db, key)

@spec delete(t(), pid(), non_neg_integer(), String.t()) ::
  {:ok, t(), non_neg_integer()} | {:error, atom()}

Deletes a single watch entry for the given pid, database, and key.

Returns {:ok, updated_watch, remaining_count} on success, where remaining_count is the number of watches remaining for the pid. Returns {:error, reason} if the watch entry does not exist.

Parameters

  • watch - The watch storage
  • pid - The process identifier
  • db - The database number
  • key - The key to unwatch

Examples

iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:ok, watch, 0} = Vdr.TS.Watch.delete(watch, self(), 0, "key1")
iex> {:error, :not_found} = Vdr.TS.Watch.delete(watch, self(), 0, "key1")

delete_all(watch, pid)

@spec delete_all(t(), pid()) :: t()

Deletes all watch entries for the given pid.

Returns the updated watch storage. This operation always succeeds, even if the pid has no watches. Deletes watches across all databases.

Parameters

  • watch - The watch storage
  • pid - The process identifier

Examples

iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 1, "key2", :ref2)
iex> watch = Vdr.TS.Watch.delete_all(watch, self())
iex> [] = Vdr.TS.Watch.lookup(watch, 0, "key1")

lookup(watch, db, key)

@spec lookup(t(), non_neg_integer(), String.t()) :: [{term(), pid()}]

Looks up all watches for the given database and key.

Returns a list of {ref, pid} tuples for all processes watching the key in the specified database.

Parameters

  • watch - The watch storage
  • db - The database number
  • key - The key to lookup

Examples

iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> [{:ref1, _pid}] = Vdr.TS.Watch.lookup(watch, 0, "key1")
iex> [] = Vdr.TS.Watch.lookup(watch, 0, "nonexistent")

lookup_by_db(watch, db)

@spec lookup_by_db(t(), non_neg_integer()) :: [{term(), pid()}]

Returns all unique {ref, pid} pairs for watches in a specific database.

This is useful for database-wide operations like FLUSHDB.

Parameters

  • watch - The watch storage
  • db - The database number

Examples

iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 1, "key2", :ref2)
iex> watchers = Vdr.TS.Watch.lookup_by_db(watch, 0)
iex> length(watchers) == 1
true