View Source KafkaEx.Server behaviour (kafka_ex v0.13.0)

Defines the KafkaEx.Server behavior that all Kafka API servers must implement, this module also provides some common callback functions that are injected into the servers that use it.

Link to this section Summary

Link to this section Callbacks

Link to this callback

kafka_server_api_versions(state)

View Source
@callback kafka_server_api_versions(state :: KafkaEx.Server.State.t()) ::
  {:reply, reply, new_state}
when reply: term(), new_state: term()
Link to this callback

kafka_server_consumer_group(state)

View Source
@callback kafka_server_consumer_group(state :: KafkaEx.Server.State.t()) ::
  {:reply, reply, new_state}
  | {:reply, reply, new_state, timeout() | :hibernate}
  | {:noreply, new_state}
  | {:noreply, new_state, timeout() | :hibernate}
  | {:stop, reason, reply, new_state}
  | {:stop, reason, new_state}
when reply: term(), new_state: term(), reason: term()
Link to this callback

kafka_server_consumer_group_metadata(state)

View Source
@callback kafka_server_consumer_group_metadata(state :: KafkaEx.Server.State.t()) ::
  {:reply, reply, new_state}
  | {:reply, reply, new_state, timeout() | :hibernate}
  | {:noreply, new_state}
  | {:noreply, new_state, timeout() | :hibernate}
  | {:stop, reason, reply, new_state}
  | {:stop, reason, new_state}
when reply: term(), new_state: term(), reason: term()
Link to this callback

kafka_server_create_topics( list, network_timeout, state )

View Source
@callback kafka_server_create_topics(
  [KafkaEx.Protocol.CreateTopics.TopicRequest.t()],
  network_timeout :: integer(),
  state :: KafkaEx.Server.State.t()
) :: {:reply, reply, new_state}
when reply: term(), new_state: term()
Link to this callback

kafka_server_delete_topics( list, network_timeout, state )

View Source
@callback kafka_server_delete_topics(
  [String.t()],
  network_timeout :: integer(),
  state :: KafkaEx.Server.State.t()
) :: {:reply, reply, new_state}
when reply: term(), new_state: term()
Link to this callback

kafka_server_fetch( fetch_request, state )

View Source
@callback kafka_server_fetch(
  fetch_request :: KafkaEx.Protocol.Fetch.Request.t(),
  state :: KafkaEx.Server.State.t()
) ::
  {:reply, reply, new_state}
  | {:reply, reply, new_state, timeout() | :hibernate}
  | {:noreply, new_state}
  | {:noreply, new_state, timeout() | :hibernate}
  | {:stop, reason, reply, new_state}
  | {:stop, reason, new_state}
when reply: term(), new_state: term(), reason: term()
Link to this callback

kafka_server_heartbeat( t, network_timeout, state )

View Source
@callback kafka_server_heartbeat(
  KafkaEx.Protocol.Heartbeat.Request.t(),
  network_timeout :: integer(),
  state :: KafkaEx.Server.State.t()
) ::
  {:reply, reply, new_state}
  | {:reply, reply, new_state, timeout() | :hibernate}
  | {:noreply, new_state}
  | {:noreply, new_state, timeout() | :hibernate}
  | {:stop, reason, reply, new_state}
  | {:stop, reason, new_state}
when reply: term(), new_state: term(), reason: term()
@callback kafka_server_init(args :: [term()]) ::
  {:ok, state}
  | {:ok, state, timeout() | :hibernate}
  | :ignore
  | {:stop, reason :: any()}
when state: any()
Link to this callback

kafka_server_join_group( t, network_timeout, state )

View Source
@callback kafka_server_join_group(
  KafkaEx.Protocol.JoinGroup.Request.t(),
  network_timeout :: integer(),
  state :: KafkaEx.Server.State.t()
) ::
  {:reply, reply, new_state}
  | {:reply, reply, new_state, timeout() | :hibernate}
  | {:noreply, new_state}
  | {:noreply, new_state, timeout() | :hibernate}
  | {:stop, reason, reply, new_state}
  | {:stop, reason, new_state}
when reply: term(), new_state: term(), reason: term()
Link to this callback

kafka_server_leave_group( t, network_timeout, state )

View Source
@callback kafka_server_leave_group(
  KafkaEx.Protocol.LeaveGroup.Request.t(),
  network_timeout :: integer(),
  state :: KafkaEx.Server.State.t()
) ::
  {:reply, reply, new_state}
  | {:reply, reply, new_state, timeout() | :hibernate}
  | {:noreply, new_state}
  | {:noreply, new_state, timeout() | :hibernate}
  | {:stop, reason, reply, new_state}
  | {:stop, reason, new_state}
when reply: term(), new_state: term(), reason: term()
Link to this callback

kafka_server_metadata(topic, state)

View Source
@callback kafka_server_metadata(topic :: binary(), state :: KafkaEx.Server.State.t()) ::
  {:reply, reply, new_state}
  | {:reply, reply, new_state, timeout() | :hibernate}
  | {:noreply, new_state}
  | {:noreply, new_state, timeout() | :hibernate}
  | {:stop, reason, reply, new_state}
  | {:stop, reason, new_state}
when reply: term(), new_state: term(), reason: term()
Link to this callback

kafka_server_offset( topic, partition, time, state )

View Source
@callback kafka_server_offset(
  topic :: binary(),
  partition :: integer(),
  time :: :calendar.datetime() | :latest | :earliest,
  state :: KafkaEx.Server.State.t()
) ::
  {:reply, reply, new_state}
  | {:reply, reply, new_state, timeout() | :hibernate}
  | {:noreply, new_state}
  | {:noreply, new_state, timeout() | :hibernate}
  | {:stop, reason, reply, new_state}
  | {:stop, reason, new_state}
when reply: term(), new_state: term(), reason: term()
Link to this callback

kafka_server_offset_commit( request, state )

View Source
@callback kafka_server_offset_commit(
  request :: KafkaEx.Protocol.OffsetCommit.Request.t(),
  state :: KafkaEx.Server.State.t()
) ::
  {:reply, reply, new_state}
  | {:reply, reply, new_state, timeout() | :hibernate}
  | {:noreply, new_state}
  | {:noreply, new_state, timeout() | :hibernate}
  | {:stop, reason, reply, new_state}
  | {:stop, reason, new_state}
when reply: term(), new_state: term(), reason: term()
Link to this callback

kafka_server_offset_fetch( request, state )

View Source
@callback kafka_server_offset_fetch(
  request :: KafkaEx.Protocol.OffsetFetch.Request.t(),
  state :: KafkaEx.Server.State.t()
) ::
  {:reply, reply, new_state}
  | {:reply, reply, new_state, timeout() | :hibernate}
  | {:noreply, new_state}
  | {:noreply, new_state, timeout() | :hibernate}
  | {:stop, reason, reply, new_state}
  | {:stop, reason, new_state}
when reply: term(), new_state: term(), reason: term()
Link to this callback

kafka_server_produce( request, state )

View Source
@callback kafka_server_produce(
  request :: KafkaEx.Protocol.Produce.Request.t(),
  state :: KafkaEx.Server.State.t()
) ::
  {:reply, reply, new_state}
  | {:reply, reply, new_state, timeout() | :hibernate}
  | {:noreply, new_state}
  | {:noreply, new_state, timeout() | :hibernate}
  | {:stop, reason, reply, new_state}
  | {:stop, reason, new_state}
when reply: term(), new_state: term(), reason: term()
Link to this callback

kafka_server_sync_group( t, network_timeout, state )

View Source
@callback kafka_server_sync_group(
  KafkaEx.Protocol.SyncGroup.Request.t(),
  network_timeout :: integer(),
  state :: KafkaEx.Server.State.t()
) ::
  {:reply, reply, new_state}
  | {:reply, reply, new_state, timeout() | :hibernate}
  | {:noreply, new_state}
  | {:noreply, new_state, timeout() | :hibernate}
  | {:stop, reason, reply, new_state}
  | {:stop, reason, new_state}
when reply: term(), new_state: term(), reason: term()
Link to this callback

kafka_server_update_consumer_metadata(state)

View Source
@callback kafka_server_update_consumer_metadata(state :: KafkaEx.Server.State.t()) ::
  {:noreply, new_state}
  | {:noreply, new_state, timeout() | :hibernate}
  | {:stop, reason :: term(), new_state}
when new_state: term()
Link to this callback

kafka_server_update_metadata(state)

View Source
@callback kafka_server_update_metadata(state :: KafkaEx.Server.State.t()) ::
  {:noreply, new_state}
  | {:noreply, new_state, timeout() | :hibernate}
  | {:stop, reason :: term(), new_state}
when new_state: term()