Watch storage for tracking key-based subscriptions with database scoping.
Allows multiple processes to register watches for specific keys in specific databases, associating each watch with a reference value. Supports efficient lookup and deletion operations.
Internal Structure
The watch storage uses two collections:
key_to_pids:%{db => %{key => %{pid => ref}}}- maps database to key to pid-to-ref mappingspid_to_keys:%{pid => MapSet.t({db, key})}- maps pids to sets of database-scoped keys
Example
watch = Vdr.TS.Watch.create()
# Add watches (with database parameter)
{:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "user:123", :my_ref)
{:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "user:456", :another_ref)
# Lookup
[{:my_ref, _pid}] = Vdr.TS.Watch.lookup(watch, 0, "user:123")
# Delete single watch
{:ok, watch, _remaining} = Vdr.TS.Watch.delete(watch, self(), 0, "user:123")
# Delete all watches for a pid
watch = Vdr.TS.Watch.delete_all(watch, self())
Summary
Functions
Adds a watch entry for the given pid, database, key, and ref.
Returns all unique {pid, ref} pairs across all watches.
Creates a new empty watch storage.
Deletes a single watch entry for the given pid, database, and key.
Deletes all watch entries for the given pid.
Looks up all watches for the given database and key.
Returns all unique {ref, pid} pairs for watches in a specific database.
Types
@type db_key() :: {non_neg_integer(), String.t()}
Functions
Adds a watch entry for the given pid, database, key, and ref.
Returns {:ok, updated_watch} on success, or {:error, reason} if the key
is already registered for this pid in this database.
Parameters
watch- The watch storagepid- The process identifierdb- The database numberkey- The key to watch (string)ref- The reference value to associate with this watch
Examples
iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:error, :already_registered} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref2)
Returns all unique {pid, ref} pairs across all watches.
This is useful for broadcasting messages to all watchers, such as sending Init messages when streaming mode starts.
Parameters
watch- The watch storage
Examples
iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key2", :ref2)
iex> watchers = Vdr.TS.Watch.all_watchers(watch)
iex> length(watchers) == 2
true
@spec create() :: t()
Creates a new empty watch storage.
Examples
iex> watch = Vdr.TS.Watch.create()
iex> is_struct(watch, Vdr.TS.Watch)
true
@spec delete(t(), pid(), non_neg_integer(), String.t()) :: {:ok, t(), non_neg_integer()} | {:error, atom()}
Deletes a single watch entry for the given pid, database, and key.
Returns {:ok, updated_watch, remaining_count} on success, where remaining_count
is the number of watches remaining for the pid. Returns {:error, reason} if the
watch entry does not exist.
Parameters
watch- The watch storagepid- The process identifierdb- The database numberkey- The key to unwatch
Examples
iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:ok, watch, 0} = Vdr.TS.Watch.delete(watch, self(), 0, "key1")
iex> {:error, :not_found} = Vdr.TS.Watch.delete(watch, self(), 0, "key1")
Deletes all watch entries for the given pid.
Returns the updated watch storage. This operation always succeeds, even if the pid has no watches. Deletes watches across all databases.
Parameters
watch- The watch storagepid- The process identifier
Examples
iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 1, "key2", :ref2)
iex> watch = Vdr.TS.Watch.delete_all(watch, self())
iex> [] = Vdr.TS.Watch.lookup(watch, 0, "key1")
@spec lookup(t(), non_neg_integer(), String.t()) :: [{term(), pid()}]
Looks up all watches for the given database and key.
Returns a list of {ref, pid} tuples for all processes watching the key
in the specified database.
Parameters
watch- The watch storagedb- The database numberkey- The key to lookup
Examples
iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> [{:ref1, _pid}] = Vdr.TS.Watch.lookup(watch, 0, "key1")
iex> [] = Vdr.TS.Watch.lookup(watch, 0, "nonexistent")
@spec lookup_by_db(t(), non_neg_integer()) :: [{term(), pid()}]
Returns all unique {ref, pid} pairs for watches in a specific database.
This is useful for database-wide operations like FLUSHDB.
Parameters
watch- The watch storagedb- The database number
Examples
iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 1, "key2", :ref2)
iex> watchers = Vdr.TS.Watch.lookup_by_db(watch, 0)
iex> length(watchers) == 1
true