View Source ProcessHub.Utility.Bag (ProcessHub v0.5.0-beta)
Utility functions for testing.
Summary
Functions
Returns all messages in the mailbox.
Awaits messages until all expected child_ids have been received.
Awaits cluster join events for the specified nodes.
Awaits cluster leave events for the specified nodes.
Generates child specs for testing.
Retrieves the value associated with a key from a list of key-value tuples.
Sends hook messages to the given process.
Waits and receives multiple messages.
Receives messages until a reducer callback signals completion.
Generates a hook manager for receiving messages.
Returns the current timestamp in the given precision.
Types
@type gen_child_specs_opts() :: [prefix: String.t(), id_type: :string | :atom]
Functions
@spec all_messages() :: [term()]
Returns all messages in the mailbox.
Awaits messages until all expected child_ids have been received.
Useful for hooks like handover_delivered that send %{child_ids: [...]}.
Parameters
receive_key- The message key to matchexpected_child_ids- List of child_ids to wait foropts- Options::timeout- Per-message timeout (default: 10000ms):error_msg- Custom error message on timeout:child_ids_key- Key to extract child_ids from message (default::child_ids)
Returns
List of all received message data.
Example
Bag.await_child_ids(Hook.handover_delivered(), migrated_child_ids, timeout: 60_000)
@spec await_cluster_join([node()] | pos_integer(), Keyword.t()) :: list()
Awaits cluster join events for the specified nodes.
Automatically calculates the expected message count based on:
- Number of nodes joining
- Current cluster size before join
- Hook scope (:global or :local)
Parameters
joining_nodes- List of nodes that are joining (or count as integer)opts- Options::cluster_size- Current cluster size before join (required for :global scope):scope- :local (default) or :global:timeout- Per-message timeout (default: 10000ms)
Examples
# Local scope - one message per joining node
Bag.await_cluster_join(peer_names, scope: :local)
# Global scope - messages from all nodes in cluster
Bag.await_cluster_join(peer_names, scope: :global, cluster_size: 5)
@spec await_cluster_leave([node()] | pos_integer(), Keyword.t()) :: list()
Awaits cluster leave events for the specified nodes.
Automatically calculates the expected message count based on:
- Number of nodes leaving
- Current cluster size before leave
- Hook scope (:global or :local)
Parameters
leaving_nodes- List of nodes that are leaving (or count as integer)opts- Options::cluster_size- Current cluster size BEFORE leave (required for :global scope):scope- :local (default) or :global:timeout- Per-message timeout (default: 10000ms)
@spec gen_child_specs(integer(), gen_child_specs_opts()) :: list()
Generates child specs for testing.
Retrieves the value associated with a key from a list of key-value tuples.
This function searches through a list of {key, value} tuples and returns the value
associated with the first matching key. If no matching key is found, the default
value is returned.
Parameters
list- A list of{key, value}tuples to search throughkey- The key to search fordefault- The value to return if the key is not found (default:nil)
Examples
iex> ProcessHub.Utility.Bag.get_by_key([{:a, 1}, {:b, 2}, {:c, 3}], :b)
2
iex> ProcessHub.Utility.Bag.get_by_key([{:a, 1}, {:b, 2}], :c, :not_found)
:not_found
iex> ProcessHub.Utility.Bag.get_by_key([], :any_key, "default")
"default"
iex> ProcessHub.Utility.Bag.get_by_key([{"key1", "value1"}, {"key2", "value2"}], "key1")
"value1"Notes
- Only searches for exact key matches using strict equality (
===) - Returns the value from the first matching tuple found
- Works with any key/value types (atoms, strings, integers, etc.)
- Gracefully ignores non-tuple elements in the list
- Only considers tuples with at least 2 elements (key-value pairs)
@spec hook_erlang_send( term(), atom() | pid() | port() | reference() | {atom(), node()}, any() ) :: any()
Sends hook messages to the given process.
@spec receive_multiple(pos_integer(), term(), Keyword.t()) :: any()
Waits and receives multiple messages.
@spec receive_until( term(), term(), (term(), term() -> {:cont, term()} | {:halt, term()}), Keyword.t() ) :: term()
Receives messages until a reducer callback signals completion.
The reducer function receives (accumulator, message_data) and returns:
{:cont, new_acc}- Continue waiting for more messages{:halt, result}- Stop and return the result
Parameters
receive_key- The message key to match (atom or {key1, key2} tuple)initial_acc- Initial accumulator valuereducer- Function(acc, msg_data) -> {:cont, new_acc} | {:halt, result}opts- Options::timeout- Per-message timeout (default: 10000ms):error_msg- Custom error message on timeout
Examples
# Wait until all expected child_ids are received
expected_ids = MapSet.new(child_ids)
Bag.receive_until(Hook.handover_delivered(), expected_ids, fn acc, %{child_ids: ids} ->
remaining = MapSet.difference(acc, MapSet.new(ids))
if MapSet.size(remaining) == 0 do
{:halt, :all_received}
else
{:cont, remaining}
end
end)
# Collect all messages until a specific count
Bag.receive_until(:my_hook, [], fn acc, data ->
new_acc = [data | acc]
if length(new_acc) >= 10, do: {:halt, new_acc}, else: {:cont, new_acc}
end)
@spec recv_hook(atom(), pid()) :: ProcessHub.Service.HookManager.t()
Generates a hook manager for receiving messages.
@spec timestamp(:microsecond | :millisecond | :nanosecond | :second | pos_integer()) :: integer()
Returns the current timestamp in the given precision.