View Source KafkaEx.Client.RequestBuilder (kafka_ex v1.0.0-rc.1)

This module is used to build request for KafkaEx.Client. It's main decision point which protocol to use for building request and what is required version.

Summary

Functions

Builds request for ApiVersions API

Builds request for CreateTopics API

Builds request for Delete Topics API

Builds request for Describe Groups API

Builds request for Fetch API

Builds request for FindCoordinator API

Builds request for Heartbeat API

Builds request for JoinGroup API

Builds request for LeaveGroup API

Builds request for List Offsets API

Builds request for Metadata API

Builds request for Offset Commit API

Builds request for Offset Fetch API

Builds request for Produce API

Builds request for SyncGroup API

Functions

Link to this function

api_versions_request(request_opts, state)

View Source
@spec api_versions_request(Keyword.t(), KafkaEx.Client.State.t()) ::
  {:ok, term()} | {:error, :api_version_no_supported}

Builds request for ApiVersions API

Link to this function

create_topics_request(request_opts, state)

View Source
@spec create_topics_request(Keyword.t(), KafkaEx.Client.State.t()) ::
  {:ok, term()} | {:error, :api_version_no_supported}

Builds request for CreateTopics API

Options

  • topics (required): List of topic configurations, each with:
    • :topic - Topic name (required)
    • :num_partitions - Number of partitions (default: -1 for broker default)
    • :replication_factor - Replication factor (default: -1 for broker default)
    • :replica_assignment - Manual replica assignment (default: [])
    • :config_entries - Topic configuration entries (default: [])
  • timeout (required): Request timeout in milliseconds
  • validate_only (optional, V1+): If true, only validate without creating
  • api_version (optional): API version to use (default: 1)
Link to this function

delete_topics_request(request_opts, state)

View Source
@spec delete_topics_request(Keyword.t(), KafkaEx.Client.State.t()) ::
  {:ok, term()} | {:error, :api_version_no_supported}

Builds request for Delete Topics API

Link to this function

describe_groups_request(request_opts, state)

View Source
@spec describe_groups_request(Keyword.t(), KafkaEx.Client.State.t()) ::
  {:ok, term()} | {:error, :api_version_no_supported}

Builds request for Describe Groups API

Link to this function

fetch_request(request_opts, state)

View Source
@spec fetch_request(Keyword.t(), KafkaEx.Client.State.t()) ::
  {:ok, term()} | {:error, :api_version_no_supported}

Builds request for Fetch API

Version-specific logic is handled by the protocol layer (RequestHelpers). This function simply passes all options through to the protocol.

Link to this function

find_coordinator_request(request_opts, state)

View Source
@spec find_coordinator_request(Keyword.t(), KafkaEx.Client.State.t()) ::
  {:ok, term()} | {:error, :api_version_no_supported}

Builds request for FindCoordinator API

Options

  • group_id (required for V0, optional for V1): The consumer group ID
  • coordinator_key (optional, V1): The key to look up (defaults to group_id)
  • coordinator_type (optional, V1): 0 for group (default), 1 for transaction
  • api_version (optional): API version to use (default: 1)
Link to this function

heartbeat_request(request_opts, state)

View Source
@spec heartbeat_request(Keyword.t(), KafkaEx.Client.State.t()) ::
  {:ok, term()} | {:error, :api_version_no_supported}

Builds request for Heartbeat API

Link to this function

join_group_request(request_opts, state)

View Source
@spec join_group_request(Keyword.t(), KafkaEx.Client.State.t()) ::
  {:ok, term()} | {:error, :api_version_no_supported}

Builds request for JoinGroup API

Version-specific logic is handled by the protocol layer (RequestHelpers). This function simply passes all options through to the protocol.

Link to this function

leave_group_request(request_opts, state)

View Source
@spec leave_group_request(Keyword.t(), KafkaEx.Client.State.t()) ::
  {:ok, term()} | {:error, :api_version_no_supported}

Builds request for LeaveGroup API

Link to this function

lists_offset_request(request_opts, state)

View Source
@spec lists_offset_request(Keyword.t(), KafkaEx.Client.State.t()) ::
  {:ok, term()} | {:error, :api_version_no_supported}

Builds request for List Offsets API

Link to this function

metadata_request(request_opts, state)

View Source
@spec metadata_request(Keyword.t(), KafkaEx.Client.State.t()) ::
  {:ok, term()} | {:error, :api_version_no_supported}

Builds request for Metadata API

Link to this function

offset_commit_request(request_opts, state)

View Source
@spec offset_commit_request(Keyword.t(), KafkaEx.Client.State.t()) ::
  {:ok, term()} | {:error, :api_version_no_supported}

Builds request for Offset Commit API

Version-specific logic is handled by the protocol layer (RequestHelpers). This function simply passes all options through to the protocol.

Link to this function

offset_fetch_request(request_opts, state)

View Source
@spec offset_fetch_request(Keyword.t(), KafkaEx.Client.State.t()) ::
  {:ok, term()} | {:error, :api_version_no_supported}

Builds request for Offset Fetch API

Link to this function

produce_request(request_opts, state)

View Source
@spec produce_request(Keyword.t(), KafkaEx.Client.State.t()) ::
  {:ok, term()} | {:error, :api_version_no_supported}

Builds request for Produce API

Version-specific logic is handled by the protocol layer (RequestHelpers). This function simply passes all options through to the protocol.

Link to this function

sync_group_request(request_opts, state)

View Source
@spec sync_group_request(Keyword.t(), KafkaEx.Client.State.t()) ::
  {:ok, term()} | {:error, :api_version_no_supported}

Builds request for SyncGroup API