elsa v0.12.3 Elsa.Supervisor
Top-level supervisor that orchestrates all other components of the Elsa library. Allows for a single point of integration into your application supervision tree and configuration by way of a series of nested keyword lists
Components not needed by a running application (if your application only consumes messages from Kafka and never producers back to it) can be safely omitted from the configuration.
Link to this section Summary
Functions
Returns a specification to start this module under a supervisor.
Callback implementation for Supervisor.init/1
.
Defines a connection for locating the Elsa Registry process.
Starts the top-level Elsa supervisor and links it to the current process. Starts a brod client and a custom process registry by default and then conditionally starts and takes supervision of any brod group-based consumers or producer processes defined.
Link to this section Functions
child_spec(init_arg)
Returns a specification to start this module under a supervisor.
See Supervisor
.
dynamic_supervisor(registry)
init(args)
Callback implementation for Supervisor.init/1
.
Defines a connection for locating the Elsa Registry process.
Starts the top-level Elsa supervisor and links it to the current process. Starts a brod client and a custom process registry by default and then conditionally starts and takes supervision of any brod group-based consumers or producer processes defined.
Options
:endpoints
- Required. Keyword list of kafka brokers. ex.[localhost: 9092]
:connection
- Required. Atom used to track kafka connection.:config
- Optional. Client configuration options passed to brod.:producer
- Optional. Can be a single producer configuration of multiples in a list.:group_consumer
- Optional. Group consumer configuration.:consumer
- Optional. Simple topic consumer configuration.
Producer Config
:topic
- Required. Producer will be started for configured topic.:poll
- Optional. If set to a number in milliseconds, will poll for new partitions and startup producers on the fly.:config
- Optional. Producer configuration options passed tobrod_producer
.
Group Consumer Config
:group
- Required. Name of consumer group.:topics
- Required. List of topics to subscribe to.:handler
- Required. Module that implements Elsa.Consumer.MessageHandler behaviour.:handler_init_args
- Optional. Any args to be passed to init function in handler module.:assignment_received_handler
- Optional. Arity 4 Function that will be called with any partition assignments. Return:ok
to for assignment to be subscribed to. Return{:error, reason}
to stop subscription. Arguments are group, topic, partition, generation_id.:assignments_revoked_handler
- Optional. Zero arity function that will be called when assignments are revoked. All workers will be shutdown before callback is invoked and must return:ok
.:config
- Optional. Consumer configuration options passed tobrod_consumer
.
Consumer Config
:topic
- Required. Topic to subscribe to.:begin_offset
- Required. Where to begin consuming from. Must be either:earliest
,:latest
, or a valid offset integer.:handler
- Required. Module that implementsElsa.Consumer.MessageHandler
behaviour.:partition
- Optional. Topic partition to subscribe to. Ifnil
, will default to all partitions.:handler_init_args
- Optional. Any args to be passed to init function in handler module.:poll
- Optional. If set to number of milliseconds, will poll for new partitions and startup consumers on the fly.
Example
Elsa.Supervisor.start_link([
endpoints: [localhost: 9092],
connection: :conn,
producer: [topic: "topic1"],
consumer: [
topic: "topic2",
partition: 0,
begin_offset: :earliest,
handler: ExampleHandler
],
group_consumer: [
group: "example-group",
topics: ["topic1"],
handler: ExampleHandler,
config: [
begin_offset: :earliest,
offset_reset_policy: :reset_to_earliest
]
]
])