Normandy.Coordination.StatefulContext (normandy v0.2.0)

View Source

GenServer-backed shared context for multi-agent systems.

StatefulContext provides a concurrent, process-based key-value store using GenServer with ETS backing for high-performance reads.

Features

  • Concurrent access from multiple processes
  • Fast reads via ETS (no GenServer bottleneck)
  • Atomic updates via GenServer
  • Optional pub/sub notifications for changes
  • Process supervision compatible

Example

# Start context process
{:ok, pid} = StatefulContext.start_link(name: :my_context)

# Store and retrieve data
:ok = StatefulContext.put(pid, "key", "value")
{:ok, "value"} = StatefulContext.get(pid, "key")

# Use namespaced keys
:ok = StatefulContext.put(pid, {"agent_1", "status"}, "active")

# Subscribe to changes
:ok = StatefulContext.subscribe(pid, self())

Summary

Functions

Returns a specification to start this module under a supervisor.

Deletes a key from the context.

Retrieves a value from the context.

Retrieves a value with a default if not found.

Returns the ETS table reference for direct access.

Checks if a key exists in the context.

Returns all keys in the context.

Stores a value in the context.

Starts a StatefulContext GenServer.

Subscribes a process to change notifications.

Returns all data in the context as a map.

Unsubscribes a process from change notifications.

Updates a value using a function.

Types

key()

@type key() :: String.t() | {String.t(), String.t()}

subscriber()

@type subscriber() :: pid()

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

delete(server, key)

@spec delete(GenServer.server(), key()) :: :ok

Deletes a key from the context.

Example

:ok = StatefulContext.delete(pid, "key")

get(server, key)

@spec get(GenServer.server(), key()) :: {:ok, term()} | {:error, :not_found}

Retrieves a value from the context.

Reads directly from ETS for maximum performance (no GenServer call).

Example

{:ok, value} = StatefulContext.get(pid, "key")
{:error, :not_found} = StatefulContext.get(pid, "missing")

get(server, key, default)

@spec get(GenServer.server(), key(), term()) :: term()

Retrieves a value with a default if not found.

Example

value = StatefulContext.get(pid, "key", "default")

get_table(server)

@spec get_table(GenServer.server()) :: :ets.tid()

Returns the ETS table reference for direct access.

Advanced users can use this for custom ETS operations.

Example

table = StatefulContext.get_table(pid)
:ets.lookup(table, "key")

has_key?(server, key)

@spec has_key?(GenServer.server(), key()) :: boolean()

Checks if a key exists in the context.

Example

true = StatefulContext.has_key?(pid, "key")

keys(server)

@spec keys(GenServer.server()) :: [String.t()]

Returns all keys in the context.

Example

keys = StatefulContext.keys(pid)
#=> ["key1", "agent_1:status"]

put(server, key, value)

@spec put(GenServer.server(), key(), term()) :: :ok

Stores a value in the context.

Writes go through GenServer for consistency, but subsequent reads are fast via ETS.

Example

:ok = StatefulContext.put(pid, "key", "value")
:ok = StatefulContext.put(pid, {"agent_1", "data"}, %{result: 42})

start_link(opts \\ [])

@spec start_link(keyword()) :: GenServer.on_start()

Starts a StatefulContext GenServer.

Options

  • :name - Register the process with a name (optional)
  • :notify_on_change - Enable change notifications (default: true)

Example

{:ok, pid} = StatefulContext.start_link(name: :shared_context)

subscribe(server, subscriber_pid)

@spec subscribe(GenServer.server(), subscriber()) :: :ok

Subscribes a process to change notifications.

The subscriber will receive messages of the form: {:context_changed, key, old_value, new_value}

Example

:ok = StatefulContext.subscribe(pid, self())

to_map(server)

@spec to_map(GenServer.server()) :: map()

Returns all data in the context as a map.

Example

data = StatefulContext.to_map(pid)
#=> %{"key1" => "value1", "agent_1:status" => "active"}

unsubscribe(server, subscriber_pid)

@spec unsubscribe(GenServer.server(), subscriber()) :: :ok

Unsubscribes a process from change notifications.

Example

:ok = StatefulContext.unsubscribe(pid, self())

update(server, key, initial, fun)

@spec update(GenServer.server(), key(), term(), (term() -> term())) :: :ok

Updates a value using a function.

If the key doesn't exist, uses the initial value.

Example

:ok = StatefulContext.update(pid, "counter", 0, fn count -> count + 1 end)