kashka v0.1.0 Kashka.GenConsumer behaviour View Source
Module to start consuming process.
Link to this section Summary
Functions
Returns a specification to start this module under a supervisor.
Start consumer process and subscribe to topics. Allowed options are
Start named consumer process and subscribe to topics. Opts are the same as start_link/1
Link to this section Types
Link to this type
state()
View Sourcestate() :: %Kashka.GenConsumer{
conn: Kashka.Http.t(),
instance_id: String.t(),
internal_state: any(),
opts: opts()
}
Link to this section Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
Start consumer process and subscribe to topics. Allowed options are:
- url: url or url with opts
- topics: list of topics (see
Kashka.Kafka.subscribe/2) - consumer_group: consumer group name
- module: module which implements Kashka.GenConsumer behaviour
- instance_id: consumer instance_id (see
Kashka.Kafka.create_consumer/3) - consumer_opts: consumer opts (see
Kashka.Kafka.create_consumer/3) - records_opts: consumer opts (see
Kashka.Kafka.get_records/3) - delete_on_exists: allow to try do delete consumer with the same instance_id if it exists
- retry_on_exists: allow to retry creating consumer every 5 seconds if consumer with the same instance_id exists
Link to this function
start_link(name, opts)
View Sourcestart_link(GenServer.name(), opts()) :: GenServer.on_start()
Start named consumer process and subscribe to topics. Opts are the same as start_link/1
Link to this section Callbacks
Link to this callback
handle_message_set(conn, state, message_set)
View Sourcehandle_message_set( conn :: Kashka.Http.t(), state :: any(), message_set :: [map()] ) :: {:ok, Kashka.Http.t(), any()}
Invoked on each received records batch.
Parameters
- conn: is opened connection to created consumer
- state: any term returned from
init/2or previoushandle_message_set/3 - message_set: received records list
Link to this callback
init(conn, opts)
View Source (optional)init(conn :: Kashka.Http.t(), opts :: opts()) :: {:ok, Kashka.Http.t(), any()}
Invoked on process initialization. Optional.
Parameters
- conn: is opened connection to created consumer
- opts: is opts passed into
start_link/1
Returns connection and internal state for handle_message_set/3