kashka v0.1.0 Kashka.GenConsumer behaviour View Source

Module to start consuming process.

Link to this section Summary

Functions

Returns a specification to start this module under a supervisor.

Start consumer process and subscribe to topics. Allowed options are

Start named consumer process and subscribe to topics. Opts are the same as start_link/1

Callbacks

Invoked on each received records batch.

Invoked on process initialization. Optional.

Link to this section Types

Link to this type

opts()

View Source
opts() :: [
  {:url, Kashka.Http.conn()}
  | {:topics, [String.t()]}
  | {:consumer_group, String.t()}
  | {:module, module()}
  | {:instance_id, String.t()}
  | {:consumer_opts, %{}}
  | {:records_opts, %{}}
  | {:delete_on_exists, boolean()}
  | {:retry_on_exists, boolean()}
  | any()
]
Link to this type

state()

View Source
state() :: %Kashka.GenConsumer{
  conn: Kashka.Http.t(),
  instance_id: String.t(),
  internal_state: any(),
  opts: opts()
}

Link to this section Functions

Returns a specification to start this module under a supervisor.

See Supervisor.

Start consumer process and subscribe to topics. Allowed options are:

  • url: url or url with opts
  • topics: list of topics (see Kashka.Kafka.subscribe/2)
  • consumer_group: consumer group name
  • module: module which implements Kashka.GenConsumer behaviour
  • instance_id: consumer instance_id (see Kashka.Kafka.create_consumer/3)
  • consumer_opts: consumer opts (see Kashka.Kafka.create_consumer/3)
  • records_opts: consumer opts (see Kashka.Kafka.get_records/3)
  • delete_on_exists: allow to try do delete consumer with the same instance_id if it exists
  • retry_on_exists: allow to retry creating consumer every 5 seconds if consumer with the same instance_id exists

Start named consumer process and subscribe to topics. Opts are the same as start_link/1

Link to this section Callbacks

Link to this callback

handle_message_set(conn, state, message_set)

View Source
handle_message_set(
  conn :: Kashka.Http.t(),
  state :: any(),
  message_set :: [map()]
) :: {:ok, Kashka.Http.t(), any()}

Invoked on each received records batch.

Parameters

  • conn: is opened connection to created consumer
  • state: any term returned from init/2 or previous handle_message_set/3
  • message_set: received records list
Link to this callback

init(conn, opts)

View Source (optional)
init(conn :: Kashka.Http.t(), opts :: opts()) :: {:ok, Kashka.Http.t(), any()}

Invoked on process initialization. Optional.

Parameters

  • conn: is opened connection to created consumer
  • opts: is opts passed into start_link/1

Returns connection and internal state for handle_message_set/3