elsa v0.12.3 Elsa.Group.Manager
Defines the GenServer process that coordinates assignment of workers to topics/partitions of a given consumer group. Tracks consumer group state and reinstantiates workers to the last unacknowledged message in the event of failure.
Link to this section Summary
Types
Function called for each new assignment
Function called for when assignments have been revoked
The offset from wthich to begin fetch requests: default = latest
Values to configure the consumer, all are optional
Module that implements the Elsa.Consumer.MessageHandler behaviour
keyword list of config values to start elsa consumer
Maximum bytes to fetch in batch of messages: default = 1MB
Max number of milliseconds to wait to wait for broker to collect min_bytes of messages: default = 10_000 ms
Minimum bytes to fetch in batch of messages: default = 0
How to reset begin_offset if OffsetOutOfRange exception is received
The total number of bytes allowed to fetch-ahead: default = 100KB
The windows size (number of messages) allowed to fetch-ahead: default = 10
Allow consumer process to sleep this amount of ms if kafka replied with 'empty' messages: default = 1_000 ms
Functions
Provides convenience for backward compatibility with previous versions of Elsa where acking for a consumer group was handled by the Elsa.Group.Manager module.
Trigger the assignment of workers to a given topic and partition
Trigger deallocation of all workers from the consumer group and stop worker processes.
Returns a specification to start this module under a supervisor.
Callback implementation for c::brod_group_member.get_committed_offsets/2
.
Callback implementation for GenServer.handle_continue/2
.
Callback implementation for GenServer.init/1
.
Start the group manager process and register a name with the process registry.
Link to this section Types
assignment_received_handler()
assignment_received_handler() :: (group(), Elsa.topic(), Elsa.partition(), generation_id() -> :ok | {:error, term()})
Function called for each new assignment
Function called for when assignments have been revoked
The offset from wthich to begin fetch requests: default = latest
consumer_config()
consumer_config() :: [ min_bytes: min_bytes(), max_bytes: max_bytes(), max_wait_time: max_wait_time(), sleep_timeout: sleep_timeout(), prefetch_count: prefetch_count(), prefetch_bytes: prefetch_bytes(), begin_offset: begin_offset(), offset_reset_policy: offset_reset_policy() ]
Values to configure the consumer, all are optional
Module that implements the Elsa.Consumer.MessageHandler behaviour
init_opts()
init_opts() :: [ connection: Elsa.connection(), endpoints: Elsa.endpoints(), group: group(), topics: [Elsa.topic()], assignment_received_handler: assignment_received_handler(), assignments_revoked_handler: assignments_revoked_handler(), handler: handler(), handler_init_args: term(), config: consumer_config() ]
keyword list of config values to start elsa consumer
Maximum bytes to fetch in batch of messages: default = 1MB
Max number of milliseconds to wait to wait for broker to collect min_bytes of messages: default = 10_000 ms
Minimum bytes to fetch in batch of messages: default = 0
offset_reset_policy()
offset_reset_policy() :: :reset_to_earliest | :reset_to_latest
How to reset begin_offset if OffsetOutOfRange exception is received
The total number of bytes allowed to fetch-ahead: default = 100KB
The windows size (number of messages) allowed to fetch-ahead: default = 10
Allow consumer process to sleep this amount of ms if kafka replied with 'empty' messages: default = 1_000 ms
Link to this section Functions
ack(connection, topic, partition, generation_id, offset)
Provides convenience for backward compatibility with previous versions of Elsa where acking for a consumer group was handled by the Elsa.Group.Manager module.
assignments_received(pid, group_member_id, generation_id, assignments)
assignments_received(GenServer.server(), term(), generation_id(), [tuple()]) :: :ok
Trigger the assignment of workers to a given topic and partition
Trigger deallocation of all workers from the consumer group and stop worker processes.
child_spec(init_arg)
Returns a specification to start this module under a supervisor.
See Supervisor
.
get_committed_offsets(pid, topic)
Callback implementation for c::brod_group_member.get_committed_offsets/2
.
handle_continue(atom, state)
Callback implementation for GenServer.handle_continue/2
.
init(opts)
Callback implementation for GenServer.init/1
.
Start the group manager process and register a name with the process registry.