View Source ProcessHub.Utility.Bag (ProcessHub v0.5.0-beta)

Utility functions for testing.

Summary

Functions

Returns all messages in the mailbox.

Awaits messages until all expected child_ids have been received.

Awaits cluster join events for the specified nodes.

Awaits cluster leave events for the specified nodes.

Generates child specs for testing.

Retrieves the value associated with a key from a list of key-value tuples.

Sends hook messages to the given process.

Waits and receives multiple messages.

Receives messages until a reducer callback signals completion.

Generates a hook manager for receiving messages.

Returns the current timestamp in the given precision.

Types

Link to this type

gen_child_specs_opts()

View Source
@type gen_child_specs_opts() :: [prefix: String.t(), id_type: :string | :atom]

Functions

@spec all_messages() :: [term()]

Returns all messages in the mailbox.

Link to this function

await_child_ids(receive_key, expected_child_ids, opts \\ [])

View Source
@spec await_child_ids(term(), [term()], Keyword.t()) :: [map()]

Awaits messages until all expected child_ids have been received.

Useful for hooks like handover_delivered that send %{child_ids: [...]}.

Parameters

  • receive_key - The message key to match
  • expected_child_ids - List of child_ids to wait for
  • opts - Options:
    • :timeout - Per-message timeout (default: 10000ms)
    • :error_msg - Custom error message on timeout
    • :child_ids_key - Key to extract child_ids from message (default: :child_ids)

Returns

List of all received message data.

Example

Bag.await_child_ids(Hook.handover_delivered(), migrated_child_ids, timeout: 60_000)
Link to this function

await_cluster_join(joining_nodes, opts \\ [])

View Source
@spec await_cluster_join([node()] | pos_integer(), Keyword.t()) :: list()

Awaits cluster join events for the specified nodes.

Automatically calculates the expected message count based on:

  • Number of nodes joining
  • Current cluster size before join
  • Hook scope (:global or :local)

Parameters

  • joining_nodes - List of nodes that are joining (or count as integer)
  • opts - Options:
    • :cluster_size - Current cluster size before join (required for :global scope)
    • :scope - :local (default) or :global
    • :timeout - Per-message timeout (default: 10000ms)

Examples

# Local scope - one message per joining node
Bag.await_cluster_join(peer_names, scope: :local)

# Global scope - messages from all nodes in cluster
Bag.await_cluster_join(peer_names, scope: :global, cluster_size: 5)
Link to this function

await_cluster_leave(leaving_nodes, opts \\ [])

View Source
@spec await_cluster_leave([node()] | pos_integer(), Keyword.t()) :: list()

Awaits cluster leave events for the specified nodes.

Automatically calculates the expected message count based on:

  • Number of nodes leaving
  • Current cluster size before leave
  • Hook scope (:global or :local)

Parameters

  • leaving_nodes - List of nodes that are leaving (or count as integer)
  • opts - Options:
    • :cluster_size - Current cluster size BEFORE leave (required for :global scope)
    • :scope - :local (default) or :global
    • :timeout - Per-message timeout (default: 10000ms)
Link to this function

gen_child_specs(count, opts \\ [])

View Source
@spec gen_child_specs(integer(), gen_child_specs_opts()) :: list()

Generates child specs for testing.

Link to this function

get_by_key(list, key, default \\ nil)

View Source
@spec get_by_key([{any(), any()}], any(), any()) :: any()

Retrieves the value associated with a key from a list of key-value tuples.

This function searches through a list of {key, value} tuples and returns the value associated with the first matching key. If no matching key is found, the default value is returned.

Parameters

  • list - A list of {key, value} tuples to search through
  • key - The key to search for
  • default - The value to return if the key is not found (default: nil)

Examples

iex> ProcessHub.Utility.Bag.get_by_key([{:a, 1}, {:b, 2}, {:c, 3}], :b)
2

iex> ProcessHub.Utility.Bag.get_by_key([{:a, 1}, {:b, 2}], :c, :not_found)
:not_found

iex> ProcessHub.Utility.Bag.get_by_key([], :any_key, "default")
"default"

iex> ProcessHub.Utility.Bag.get_by_key([{"key1", "value1"}, {"key2", "value2"}], "key1")
"value1"

Notes

  • Only searches for exact key matches using strict equality (===)
  • Returns the value from the first matching tuple found
  • Works with any key/value types (atoms, strings, integers, etc.)
  • Gracefully ignores non-tuple elements in the list
  • Only considers tuples with at least 2 elements (key-value pairs)
Link to this function

hook_erlang_send(hook_data, pid, msg)

View Source
@spec hook_erlang_send(
  term(),
  atom() | pid() | port() | reference() | {atom(), node()},
  any()
) :: any()

Sends hook messages to the given process.

Link to this function

receive_multiple(x, receive_key, opts \\ [])

View Source
@spec receive_multiple(pos_integer(), term(), Keyword.t()) :: any()

Waits and receives multiple messages.

Link to this function

receive_until(receive_key, initial_acc, reducer, opts \\ [])

View Source
@spec receive_until(
  term(),
  term(),
  (term(), term() -> {:cont, term()} | {:halt, term()}),
  Keyword.t()
) :: term()

Receives messages until a reducer callback signals completion.

The reducer function receives (accumulator, message_data) and returns:

  • {:cont, new_acc} - Continue waiting for more messages
  • {:halt, result} - Stop and return the result

Parameters

  • receive_key - The message key to match (atom or {key1, key2} tuple)
  • initial_acc - Initial accumulator value
  • reducer - Function (acc, msg_data) -> {:cont, new_acc} | {:halt, result}

  • opts - Options:
    • :timeout - Per-message timeout (default: 10000ms)
    • :error_msg - Custom error message on timeout

Examples

# Wait until all expected child_ids are received
expected_ids = MapSet.new(child_ids)
Bag.receive_until(Hook.handover_delivered(), expected_ids, fn acc, %{child_ids: ids} ->
  remaining = MapSet.difference(acc, MapSet.new(ids))
  if MapSet.size(remaining) == 0 do
    {:halt, :all_received}
  else
    {:cont, remaining}
  end
end)

# Collect all messages until a specific count
Bag.receive_until(:my_hook, [], fn acc, data ->
  new_acc = [data | acc]
  if length(new_acc) >= 10, do: {:halt, new_acc}, else: {:cont, new_acc}
end)
Link to this function

recv_hook(key, recv_pid)

View Source
@spec recv_hook(atom(), pid()) :: ProcessHub.Service.HookManager.t()

Generates a hook manager for receiving messages.

Link to this function

timestamp(precision \\ :second)

View Source
@spec timestamp(:microsecond | :millisecond | :nanosecond | :second | pos_integer()) ::
  integer()

Returns the current timestamp in the given precision.