elsa v0.12.3 Elsa.Util

Provides functions for simplifying first-class interactions (consuming and producing) such as connecting to a cluster and establishing a persistent client process for interacting with a cluster.

Link to this section Summary

Functions

Process messages into chunks of size up to the size specified by the calling function in bytes, and determined by the function argument. If no chunk size is specified the default maximum size a chunk will be is approximately 1 megabyte. If no sizing function is provided to construct the appropriately sized chunks, the internal function based on Kernel.byte_size/1 is used.

Determines if client pid is alive

Retrieve the api version of the desired operation supported by the connected cluster.

Return the number of partitions for a given topic. Bypasses the need for a persistent client for lighter weight interactions from one-off calls.

Convert supplied cluster endpoints from common keyword list format to brod-compatible tuple.

Create a named client connection process for managing interactions with the connected cluster.

Retrieves the pid of a brod client process if it exists and executes the given function against the client.

Wrap establishing a connection to a cluster for performing an operation.

Retrieves the appropriate registry for the given value and validates it exists. Executes the function with the registry name if it successfully locates one.

Link to this section Functions

Link to this function

chunk_by_byte_size(collection, chunk_byte_size \\ 900000, byte_size_function \\ &get_byte_size/1)

chunk_by_byte_size(term(), integer(), (... -> any())) :: [term()]

Process messages into chunks of size up to the size specified by the calling function in bytes, and determined by the function argument. If no chunk size is specified the default maximum size a chunk will be is approximately 1 megabyte. If no sizing function is provided to construct the appropriately sized chunks, the internal function based on Kernel.byte_size/1 is used.

Link to this function

client?(pid)

client?(pid() | atom()) :: boolean()

Determines if client pid is alive

Link to this function

get_api_version(connection, api)

get_api_version(pid(), atom()) :: non_neg_integer()

Retrieve the api version of the desired operation supported by the connected cluster.

Link to this function

partition_count(endpoints, topic)

partition_count(keyword() | Elsa.connection() | pid(), String.t()) :: integer()

Return the number of partitions for a given topic. Bypasses the need for a persistent client for lighter weight interactions from one-off calls.

Link to this function

reformat_endpoints(endpoints)

reformat_endpoints(keyword()) :: [{charlist(), integer()}]

Convert supplied cluster endpoints from common keyword list format to brod-compatible tuple.

Link to this function

start_client(endpoints, name, config \\ [])

Create a named client connection process for managing interactions with the connected cluster.

Link to this function

with_client(registry, function)

with_client(atom(), (pid() -> term())) :: term() | {:error, String.t()}

Retrieves the pid of a brod client process if it exists and executes the given function against the client.

Link to this function

with_connection(endpoints, type \\ :any, fun)

with_connection(Elsa.endpoints(), atom(), (... -> any())) :: term()

Wrap establishing a connection to a cluster for performing an operation.

Link to this function

with_registry(connection, function)

with_registry(atom() | String.t(), (atom() -> term())) ::
  term() | {:error, String.t()}

Retrieves the appropriate registry for the given value and validates it exists. Executes the function with the registry name if it successfully locates one.