elsa v0.12.3 Elsa.Group.Manager

Defines the GenServer process that coordinates assignment of workers to topics/partitions of a given consumer group. Tracks consumer group state and reinstantiates workers to the last unacknowledged message in the event of failure.

Link to this section Summary

Types

Function called for each new assignment

Function called for when assignments have been revoked

The offset from wthich to begin fetch requests: default = latest

Values to configure the consumer, all are optional

Module that implements the Elsa.Consumer.MessageHandler behaviour

keyword list of config values to start elsa consumer

Maximum bytes to fetch in batch of messages: default = 1MB

Max number of milliseconds to wait to wait for broker to collect min_bytes of messages: default = 10_000 ms

Minimum bytes to fetch in batch of messages: default = 0

How to reset begin_offset if OffsetOutOfRange exception is received

The total number of bytes allowed to fetch-ahead: default = 100KB

The windows size (number of messages) allowed to fetch-ahead: default = 10

Allow consumer process to sleep this amount of ms if kafka replied with 'empty' messages: default = 1_000 ms

Functions

Provides convenience for backward compatibility with previous versions of Elsa where acking for a consumer group was handled by the Elsa.Group.Manager module.

Trigger the assignment of workers to a given topic and partition

Trigger deallocation of all workers from the consumer group and stop worker processes.

Returns a specification to start this module under a supervisor.

Callback implementation for c::brod_group_member.get_committed_offsets/2.

Callback implementation for GenServer.init/1.

Start the group manager process and register a name with the process registry.

Link to this section Types

Link to this type

assignment_received_handler()

assignment_received_handler() ::
  (group(), Elsa.topic(), Elsa.partition(), generation_id() ->
     :ok | {:error, term()})

Function called for each new assignment

Link to this type

assignments_revoked_handler()

assignments_revoked_handler() :: (() -> :ok)

Function called for when assignments have been revoked

Link to this type

begin_offset()

begin_offset() :: non_neg_integer()

The offset from wthich to begin fetch requests: default = latest

Link to this type

consumer_config()

consumer_config() :: [
  min_bytes: min_bytes(),
  max_bytes: max_bytes(),
  max_wait_time: max_wait_time(),
  sleep_timeout: sleep_timeout(),
  prefetch_count: prefetch_count(),
  prefetch_bytes: prefetch_bytes(),
  begin_offset: begin_offset(),
  offset_reset_policy: offset_reset_policy()
]

Values to configure the consumer, all are optional

Link to this type

generation_id()

generation_id() :: pos_integer()
Link to this type

group()

group() :: String.t()
Link to this type

handler()

handler() :: module()

Module that implements the Elsa.Consumer.MessageHandler behaviour

Link to this type

init_opts()

init_opts() :: [
  connection: Elsa.connection(),
  endpoints: Elsa.endpoints(),
  group: group(),
  topics: [Elsa.topic()],
  assignment_received_handler: assignment_received_handler(),
  assignments_revoked_handler: assignments_revoked_handler(),
  handler: handler(),
  handler_init_args: term(),
  config: consumer_config()
]

keyword list of config values to start elsa consumer

Link to this type

max_bytes()

max_bytes() :: non_neg_integer()

Maximum bytes to fetch in batch of messages: default = 1MB

Link to this type

max_wait_time()

max_wait_time() :: non_neg_integer()

Max number of milliseconds to wait to wait for broker to collect min_bytes of messages: default = 10_000 ms

Link to this type

min_bytes()

min_bytes() :: non_neg_integer()

Minimum bytes to fetch in batch of messages: default = 0

Link to this type

offset_reset_policy()

offset_reset_policy() :: :reset_to_earliest | :reset_to_latest

How to reset begin_offset if OffsetOutOfRange exception is received

Link to this type

prefetch_bytes()

prefetch_bytes() :: non_neg_integer()

The total number of bytes allowed to fetch-ahead: default = 100KB

Link to this type

prefetch_count()

prefetch_count() :: non_neg_integer()

The windows size (number of messages) allowed to fetch-ahead: default = 10

Link to this type

sleep_timeout()

sleep_timeout() :: non_neg_integer()

Allow consumer process to sleep this amount of ms if kafka replied with 'empty' messages: default = 1_000 ms

Link to this section Functions

Link to this function

ack(connection, topic, partition, generation_id, offset)

Provides convenience for backward compatibility with previous versions of Elsa where acking for a consumer group was handled by the Elsa.Group.Manager module.

Link to this function

assignments_received(pid, group_member_id, generation_id, assignments)

assignments_received(GenServer.server(), term(), generation_id(), [tuple()]) ::
  :ok

Trigger the assignment of workers to a given topic and partition

Link to this function

assignments_revoked(pid)

assignments_revoked(GenServer.server()) :: :ok

Trigger deallocation of all workers from the consumer group and stop worker processes.

Link to this macro

brod_received_assignment(args \\ [])

(macro)
Link to this macro

brod_received_assignment(record, args)

(macro)
Link to this function

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

get_committed_offsets(pid, topic)

Callback implementation for c::brod_group_member.get_committed_offsets/2.

Link to this function

handle_continue(atom, state)

Callback implementation for GenServer.handle_continue/2.

Callback implementation for GenServer.init/1.

Link to this function

start_link(opts)

start_link(init_opts()) :: GenServer.on_start()

Start the group manager process and register a name with the process registry.