View Source brod (brod v3.16.3)

Link to this section Summary

Functions

Connect to consumer group coordinator broker.

Create topic(s) in kafka Return the message body of create_topics, response. See kpro_schema.erl for struct details
Delete topic(s) from kafka Return the message body of delete_topics, response. See kpro_schema.erl for struct details
Describe consumer groups. The given consumer group IDs should be all managed by the coordinator-broker running at the given endpoint. Otherwise error codes will be returned in the result structs. Return describe_groups response body field named groups. See kpro_schema.erl for struct details
Fetch a single message set from the given topic-partition. The first arg can either be an already established connection to leader, or {Endpoints, ConnConfig} so to establish a new connection before fetch.
Fetch a single message set from the given topic-partition. The first arg can either be an already established connection to leader, or {Endpoints, ConnConfig} so to establish a new connection before fetch.
Same as fetch_committed_offsets/3, but works with a started brod_client
Fetch committed offsets for ALL topics in the given consumer group. Return the responses field of the offset_fetch response. See kpro_schema.erl for struct details.
Fold through messages in a partition. Works like lists:foldl/2 but with below stop conditions: * Always return after reach high watermark offset * Return after the given message count limit is reached * Return after the given kafka offset is reached. * Return if the FoldFun returns an {error, Reason} tuple. NOTE: Exceptions from evaluating FoldFun are not caught.
Fetch broker metadata Return the message body of metadata response. See kpro_schema.erl for details
Fetch broker/topic metadata Return the message body of metadata response. See kpro_schema.erl for struct details
Fetch broker/topic metadata Return the message body of metadata response. See kpro_schema.erl for struct details
Get number of partitions for a given topic. The higher level producers may need the partition numbers to find the partition producer pid --- if the number of partitions is not statically configured for them. It is up to the callers how they want to distribute their data (e.g. random, roundrobin or consistent-hashing) to the partitions.

Equivalent to brod_client:get_producer / 3.

List ALL consumer groups in the given kafka cluster. NOTE: Exception if failed to connect any of the coordinator brokers.
List consumer groups in the given group coordinator broker.
Produce one message if Value is a binary or an iolist. Otherwise send a batch, if Value is a (nested) key-value list, or a list of maps. In this case Key is discarded (only the keys in the key-value list are sent to Kafka). The pid should be a partition producer pid, NOT client pid. The return value is a call reference of type call_ref(), so the caller can use it to expect (match) a #brod_produce_reply{result = brod_produce_req_acked} message after the produce request has been acked by Kafka.
Produce one message if Value is a binary or an iolist. Otherwise send a batch if Value is a (nested) key-value list, or a list of maps. In this case Key is used only for partitioning, or discarded if the 3rd argument is a partition number instead of a partitioner callback. This function first looks up the producer pid, then calls produce/3 to do the real work. The return value is a call reference of type call_ref(), so the caller can used it to expect (match) a #brod_produce_reply{result = brod_produce_req_acked} message after the produce request has been acked by Kafka.
Same as produce/3, only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka.
Same as produce/5 only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka. Return the partition to caller as {ok, Partition} for caller to correlate the callback when the 3rd arg is not a partition number.
Find the partition worker and send message without any ack. NOTE: This call has no back-pressure to the caller, excessive usage may cause BEAM to run out of memory.

Sync version of produce/3

Sync version of produce/5 This function will not return until a response is received from kafka, however if producer is started with required_acks set to 0, this function will return once the messages are buffered in the producer process.
Version of produce_sync/5 that returns the offset assigned by Kafka If producer is started with required_acks set to 0, the offset will be ?BROD_PRODUCE_UNKNOWN_OFFSET.
Resolve semantic offset or timestamp to real offset.
Resolve semantic offset or timestamp to real offset.
Resolve semantic offset or timestamp to real offset.
Start brod application.
Application behaviour callback
Start group_subscriber_v2
Dynamically start a per-topic producer.

See also: brod_producer:start_link/4.

Stop brod application.
Application behaviour callback
Stop a client.

Subscribe to a data stream from the given topic-partition.

Block wait for sent produced request to be acked by kafka.
As sync_produce_request_offset/1, but also returning assigned offset See produce_sync_offset/5.
Unsubscribe the current subscriber. Assuming the subscriber is self().
Unsubscribe the current subscriber.
Unsubscribe the current subscriber. Assuming the subscriber is self().
Unsubscribe the current subscriber.

Link to this section Types

-type batch_input() :: [msg_input()].
-type bootstrap() :: [endpoint()] | {[endpoint()], client_config()}.
-type call_ref() :: #brod_call_ref{}.
-type cg() :: #brod_cg{}.
-type cg_protocol_type() :: binary().
-type client() :: client_id() | pid().
-type client_config() :: brod_client:config().
-type client_id() :: atom().
-type compression() :: no_compression | gzip | snappy.
-type conn_config() :: [{atom(), term()}] | kpro:conn_config().
-type connection() :: kpro:connection().
-type consumer_config() :: brod_consumer:config().
-type consumer_option() ::
    begin_offset | min_bytes | max_bytes | max_wait_time | sleep_timeout | prefetch_count |
    prefetch_bytes | offset_reset_policy | size_stat_window.
-type consumer_options() :: [{consumer_option(), integer()}].
-type endpoint() :: {hostname(), portnum()}.
-type error_code() :: kpro:error_code().
-type fetch_opts() :: kpro:fetch_opts().
-type fold_acc() :: term().
-type fold_fun(Acc) :: fun((message(), Acc) -> {ok, Acc} | {error, any()}).
fold always returns when reaches the high watermark offset. fold also returns when any of the limits is hit.
-type fold_limits() :: #{message_count => pos_integer(), reach_offset => offset()}.
-type fold_result() :: {fold_acc(), OffsetToContinue :: offset(), fold_stop_reason()}.
-type fold_stop_reason() ::
    reached_end_of_partition | reached_message_count_limit | reached_target_offset |
    {error, any()}.
OffsetToContinue: begin offset for the next fold call
-type group_config() :: proplists:proplist().
-type group_generation_id() :: non_neg_integer().
-type group_id() :: kpro:group_id().
-type group_member() :: {group_member_id(), #kafka_group_member_metadata{}}.
-type group_member_id() :: binary().
-type hostname() :: kpro:hostname().
-type key() :: undefined | binary().
-type message() :: kpro:message().
-type message_set() :: #kafka_message_set{}.
-type msg_input() :: kpro:msg_input().
-type msg_ts() :: kpro:msg_ts().
-type offset() :: kpro:offset().
-type offset_time() :: integer() | earliest | latest.
-type partition() :: kpro:partition().
Link to this type

partition_assignment/0

View Source
-type partition_assignment() :: {topic(), [partition()]}.
-type partition_fun() :: fun((topic(), pos_integer(), key(), value()) -> {ok, partition()}).
-type partitioner() :: partition_fun() | random | hash.
-type portnum() :: pos_integer().
-type produce_ack_cb() :: fun((partition(), offset()) -> _).
-type produce_reply() :: #brod_produce_reply{}.
-type produce_result() :: brod_produce_req_buffered | brod_produce_req_acked.
-type producer_config() :: brod_producer:config().
Link to this type

received_assignments/0

View Source
-type received_assignments() :: [#brod_received_assignment{}].
-type topic() :: kpro:topic().
-type topic_config() :: kpro:struct().
-type topic_partition() :: {topic(), partition()}.
-type value() ::
    undefined |
    iodata() |
    {msg_ts(), binary()} |
    [{key(), value()}] |
    [{msg_ts(), key(), value()}] |
    kpro:msg_input() |
    kpro:batch_input().

Link to this section Functions

Link to this function

connect_group_coordinator(BootstrapEndpoints, ConnCfg, GroupId)

View Source
-spec connect_group_coordinator([endpoint()], conn_config(), group_id()) -> {ok, pid()} | {error, any()}.

Connect to consumer group coordinator broker.

Done in steps:
  1. Connect to any of the given bootstrap ednpoints
  2. Send group_coordinator_request to resolve group coordinator endpoint
  3. Connect to the resolved endpoint and return the connection pid
Link to this function

connect_leader(Hosts, Topic, Partition, ConnConfig)

View Source
-spec connect_leader([endpoint()], topic(), partition(), conn_config()) -> {ok, pid()}.
Connect partition leader.
Link to this function

consume_ack(ConsumerPid, Offset)

View Source
-spec consume_ack(pid(), offset()) -> ok | {error, any()}.
Link to this function

consume_ack(Client, Topic, Partition, Offset)

View Source
-spec consume_ack(client(), topic(), partition(), offset()) -> ok | {error, any()}.
Link to this function

create_topics(Hosts, TopicConfigs, RequestConfigs)

View Source
-spec create_topics([endpoint()],
              [topic_config()],
              #{timeout => kpro:int32(), validate_only => boolean()}) ->
                 ok | {ok, kpro:struct()} | {error, any()}.

Equivalent to create_topics(Hosts, TopicsConfigs, RequestConfigs, []).

Link to this function

create_topics(Hosts, TopicConfigs, RequestConfigs, Options)

View Source
-spec create_topics([endpoint()],
              [topic_config()],
              #{timeout => kpro:int32(), validate_only => boolean()},
              conn_config()) ->
                 ok | {ok, kpro:struct()} | {error, any()}.
Create topic(s) in kafka Return the message body of create_topics, response. See kpro_schema.erl for struct details
Link to this function

delete_topics(Hosts, Topics, Timeout)

View Source
-spec delete_topics([endpoint()], [topic()], pos_integer()) -> ok | {error, any()}.

Equivalent to delete_topics(Hosts, Topics, Timeout, []).

Link to this function

delete_topics(Hosts, Topics, Timeout, Options)

View Source
-spec delete_topics([endpoint()], [topic()], pos_integer(), conn_config()) -> ok | {error, any()}.
Delete topic(s) from kafka Return the message body of delete_topics, response. See kpro_schema.erl for struct details
Link to this function

describe_groups(CoordinatorEndpoint, ConnCfg, IDs)

View Source
-spec describe_groups(endpoint(), conn_config(), [group_id()]) -> {ok, [kpro:struct()]} | {error, any()}.
Describe consumer groups. The given consumer group IDs should be all managed by the coordinator-broker running at the given endpoint. Otherwise error codes will be returned in the result structs. Return describe_groups response body field named groups. See kpro_schema.erl for struct details
Link to this function

fetch(ConnOrBootstrap, Topic, Partition, Offset)

View Source
-spec fetch(connection() | client_id() | bootstrap(), topic(), partition(), integer()) ->
         {ok, {HwOffset :: offset(), [message()]}} | {error, any()}.
Fetch a single message set from the given topic-partition. The first arg can either be an already established connection to leader, or {Endpoints, ConnConfig} so to establish a new connection before fetch.
Link to this function

fetch(ConnOrBootstrap, Topic, Partition, Offset, Opts)

View Source
-spec fetch(connection() | client_id() | bootstrap(), topic(), partition(), offset(), fetch_opts()) ->
         {ok, {HwOffset :: offset(), [message()]}} | {error, any()}.
Fetch a single message set from the given topic-partition. The first arg can either be an already established connection to leader, or {Endpoints, ConnConfig} so to establish a new connection before fetch.
Link to this function

fetch(Hosts, Topic, Partition, Offset, MaxWaitTime, MinBytes, MaxBytes)

View Source
This function is deprecated. fetch(Hosts, Topic, Partition, Offset, Wait, MinBytes, MaxBytes, []).
-spec fetch([endpoint()],
      topic(),
      partition(),
      offset(),
      non_neg_integer(),
      non_neg_integer(),
      pos_integer()) ->
         {ok, [message()]} | {error, any()}.
Link to this function

fetch(Hosts, Topic, Partition, Offset, MaxWaitTime, MinBytes, MaxBytes, ConnConfig)

View Source
This function is deprecated. Fetch a single message set from the given topic-partition..
-spec fetch([endpoint()],
      topic(),
      partition(),
      offset(),
      non_neg_integer(),
      non_neg_integer(),
      pos_integer(),
      conn_config()) ->
         {ok, [message()]} | {error, any()}.
Link to this function

fetch_committed_offsets(Client, GroupId)

View Source
-spec fetch_committed_offsets(client(), group_id()) -> {ok, [kpro:struct()]} | {error, any()}.
Same as fetch_committed_offsets/3, but works with a started brod_client
Link to this function

fetch_committed_offsets(BootstrapEndpoints, ConnCfg, GroupId)

View Source
-spec fetch_committed_offsets([endpoint()], conn_config(), group_id()) ->
                           {ok, [kpro:struct()]} | {error, any()}.
Fetch committed offsets for ALL topics in the given consumer group. Return the responses field of the offset_fetch response. See kpro_schema.erl for struct details.
Link to this function

fold(Bootstrap, Topic, Partition, Offset, Opts, Acc, Fun, Limits)

View Source
-spec fold(connection() | client_id() | bootstrap(),
     topic(),
     partition(),
     offset(),
     fetch_opts(),
     Acc,
     fold_fun(Acc),
     fold_limits()) ->
        fold_result()
        when Acc :: fold_acc().
Fold through messages in a partition. Works like lists:foldl/2 but with below stop conditions: * Always return after reach high watermark offset * Return after the given message count limit is reached * Return after the given kafka offset is reached. * Return if the FoldFun returns an {error, Reason} tuple. NOTE: Exceptions from evaluating FoldFun are not caught.
Link to this function

get_consumer(Client, Topic, Partition)

View Source
-spec get_consumer(client(), topic(), partition()) -> {ok, pid()} | {error, Reason}
                when
                    Reason ::
                        client_down |
                        {client_down, any()} |
                        {consumer_down, any()} |
                        {consumer_not_found, topic()} |
                        {consumer_not_found, topic(), partition()}.
-spec get_metadata([endpoint()]) -> {ok, kpro:struct()} | {error, any()}.
Fetch broker metadata Return the message body of metadata response. See kpro_schema.erl for details
Link to this function

get_metadata(Hosts, Topics)

View Source
-spec get_metadata([endpoint()], all | [topic()]) -> {ok, kpro:struct()} | {error, any()}.
Fetch broker/topic metadata Return the message body of metadata response. See kpro_schema.erl for struct details
Link to this function

get_metadata(Hosts, Topics, Options)

View Source
-spec get_metadata([endpoint()], all | [topic()], conn_config()) -> {ok, kpro:struct()} | {error, any()}.
Fetch broker/topic metadata Return the message body of metadata response. See kpro_schema.erl for struct details
Link to this function

get_partitions_count(Client, Topic)

View Source
-spec get_partitions_count(client(), topic()) -> {ok, pos_integer()} | {error, any()}.
Get number of partitions for a given topic. The higher level producers may need the partition numbers to find the partition producer pid --- if the number of partitions is not statically configured for them. It is up to the callers how they want to distribute their data (e.g. random, roundrobin or consistent-hashing) to the partitions.
Link to this function

get_producer(Client, Topic, Partition)

View Source
-spec get_producer(client(), topic(), partition()) -> {ok, pid()} | {error, Reason}
                when
                    Reason ::
                        client_down |
                        {client_down, any()} |
                        {producer_down, any()} |
                        {producer_not_found, topic()} |
                        {producer_not_found, topic(), partition()}.

Equivalent to brod_client:get_producer / 3.

Link to this function

list_all_groups(Endpoints, ConnCfg)

View Source
-spec list_all_groups([endpoint()], conn_config()) -> [{endpoint(), [cg()] | {error, any()}}].
List ALL consumer groups in the given kafka cluster. NOTE: Exception if failed to connect any of the coordinator brokers.
Link to this function

list_groups(CoordinatorEndpoint, ConnCfg)

View Source
-spec list_groups(endpoint(), conn_config()) -> {ok, [cg()]} | {error, any()}.
List consumer groups in the given group coordinator broker.
-spec produce(pid(), value()) -> {ok, call_ref()} | {error, any()}.

Equivalent to produce(Pid, <<>>, Value).

Link to this function

produce(ProducerPid, Key, Value)

View Source
-spec produce(pid(), key(), value()) -> {ok, call_ref()} | {error, any()}.
Produce one message if Value is a binary or an iolist. Otherwise send a batch, if Value is a (nested) key-value list, or a list of maps. In this case Key is discarded (only the keys in the key-value list are sent to Kafka). The pid should be a partition producer pid, NOT client pid. The return value is a call reference of type call_ref(), so the caller can use it to expect (match) a #brod_produce_reply{result = brod_produce_req_acked} message after the produce request has been acked by Kafka.
Link to this function

produce(Client, Topic, Partition, Key, Value)

View Source
-spec produce(client(), topic(), partition() | partitioner(), key(), value()) ->
           {ok, call_ref()} | {error, any()}.
Produce one message if Value is a binary or an iolist. Otherwise send a batch if Value is a (nested) key-value list, or a list of maps. In this case Key is used only for partitioning, or discarded if the 3rd argument is a partition number instead of a partitioner callback. This function first looks up the producer pid, then calls produce/3 to do the real work. The return value is a call reference of type call_ref(), so the caller can used it to expect (match) a #brod_produce_reply{result = brod_produce_req_acked} message after the produce request has been acked by Kafka.
Link to this function

produce_cb(ProducerPid, Key, Value, AckCb)

View Source
-spec produce_cb(pid(), key(), value(), produce_ack_cb()) -> ok | {error, any()}.
Same as produce/3, only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka.
Link to this function

produce_cb(Client, Topic, Part, Key, Value, AckCb)

View Source
-spec produce_cb(client(), topic(), partition() | partitioner(), key(), value(), produce_ack_cb()) ->
              ok | {ok, partition()} | {error, any()}.
Same as produce/5 only the ack is not delivered as a message, instead, the callback is evaluated by producer worker when ack is received from kafka. Return the partition to caller as {ok, Partition} for caller to correlate the callback when the 3rd arg is not a partition number.
Link to this function

produce_no_ack(Client, Topic, Part, Key, Value)

View Source
-spec produce_no_ack(client(), topic(), partition() | partitioner(), key(), value()) ->
                  ok | {error, any()}.
Find the partition worker and send message without any ack. NOTE: This call has no back-pressure to the caller, excessive usage may cause BEAM to run out of memory.
Link to this function

produce_sync(Pid, Value)

View Source
-spec produce_sync(pid(), value()) -> ok.

Equivalent to produce_sync(Pid, <<>>, Value).

Same as produce/5 only the ack is not d
Link to this function

produce_sync(Pid, Key, Value)

View Source
-spec produce_sync(pid(), key(), value()) -> ok | {error, any()}.

Sync version of produce/3

This function will not return until the response is received from Kafka. But when producer is started with required_acks set to 0, this function will return once the messages are buffered in the producer process.
Link to this function

produce_sync(Client, Topic, Partition, Key, Value)

View Source
-spec produce_sync(client(), topic(), partition() | partitioner(), key(), value()) ->
                ok | {error, any()}.
Sync version of produce/5 This function will not return until a response is received from kafka, however if producer is started with required_acks set to 0, this function will return once the messages are buffered in the producer process.
Link to this function

produce_sync_offset(Client, Topic, Partition, Key, Value)

View Source
-spec produce_sync_offset(client(), topic(), partition() | partitioner(), key(), value()) ->
                       {ok, offset()} | {error, any()}.
Version of produce_sync/5 that returns the offset assigned by Kafka If producer is started with required_acks set to 0, the offset will be ?BROD_PRODUCE_UNKNOWN_OFFSET.
Link to this function

resolve_offset(Hosts, Topic, Partition)

View Source
-spec resolve_offset([endpoint()], topic(), partition()) -> {ok, offset()} | {error, any()}.

Equivalent to resolve_offset(Hosts, Topic, Partition, latest, 1).

Link to this function

resolve_offset(Hosts, Topic, Partition, Time)

View Source
-spec resolve_offset([endpoint()], topic(), partition(), offset_time()) ->
                  {ok, offset()} | {error, any()}.
Resolve semantic offset or timestamp to real offset.
Link to this function

resolve_offset(Hosts, Topic, Partition, Time, ConnCfg)

View Source
-spec resolve_offset([endpoint()], topic(), partition(), offset_time(), conn_config()) ->
                  {ok, offset()} | {error, any()}.
Resolve semantic offset or timestamp to real offset.
Link to this function

resolve_offset(Hosts, Topic, Partition, Time, ConnCfg, Opts)

View Source
-spec resolve_offset([endpoint()],
               topic(),
               partition(),
               offset_time(),
               conn_config(),
               #{timeout => kpro:int32()}) ->
                  {ok, offset()} | {error, any()}.
Resolve semantic offset or timestamp to real offset.
-spec start() -> ok | no_return().
Start brod application.
Link to this function

start(StartType, StartArgs)

View Source
Application behaviour callback
Link to this function

start_client(BootstrapEndpoints)

View Source
-spec start_client([endpoint()]) -> ok | {error, any()}.

Equivalent to start_client(BootstrapEndpoints, brod_default_client).

Link to this function

start_client(BootstrapEndpoints, ClientId)

View Source
-spec start_client([endpoint()], client_id()) -> ok | {error, any()}.

Equivalent to start_client(BootstrapEndpoints, ClientId, []).

Link to this function

start_client(BootstrapEndpoints, ClientId, Config)

View Source
-spec start_client([endpoint()], client_id(), client_config()) -> ok | {error, any()}.

Start a client.

BootstrapEndpoints: Kafka cluster endpoints, can be any of the brokers in the cluster, which does not necessarily have to be the leader of any partition, e.g. a load-balanced entrypoint to the remote Kafka cluster.

ClientId: Atom to identify the client process.

Config is a proplist, possible values:
  • restart_delay_seconds (optional, default=10)

    How long to wait between attempts to restart brod_client process when it crashes.
  • get_metadata_timeout_seconds (optional, default=5)

    Return {error, timeout} from brod_client:get_xxx calls if responses for APIs such as metadata, find_coordinator are not received in time.
  • reconnect_cool_down_seconds (optional, default=1)

    Delay this configured number of seconds before retrying to establish a new connection to the kafka partition leader.
  • allow_topic_auto_creation (optional, default=true)

    By default, brod respects what is configured in the broker about topic auto-creation. i.e. whether auto.create.topics.enable is set in the broker configuration. However if allow_topic_auto_creation is set to false in client config, brod will avoid sending metadata requests that may cause an auto-creation of the topic regardless of what broker config is.
  • auto_start_producers (optional, default=false)

    If true, brod client will spawn a producer automatically when user is trying to call produce but did not call brod:start_producer explicitly. Can be useful for applications which don't know beforehand which topics they will be working with.
  • default_producer_config (optional, default=[])

    Producer configuration to use when auto_start_producers is true. See brod_producer:start_link/4 for details about producer config

Connection options can be added to the same proplist. See kpro_connection.erl in kafka_protocol for the details:

  • ssl (optional, default=false)

    true | false | ssl:ssl_option()true is translated to [] as ssl:ssl_option() i.e. all default.
  • sasl (optional, default=undefined)

    Credentials for SASL/Plain authentication. {mechanism(), Filename} or {mechanism(), UserName, Password} where mechanism can be atoms: plain (for "PLAIN"), scram_sha_256 (for "SCRAM-SHA-256") or scram_sha_512 (for SCRAM-SHA-512). Filename should be a file consisting two lines, first line is the username and the second line is the password. Both Username and Password should be string() | binary()
  • connect_timeout (optional, default=5000)

    Timeout when trying to connect to an endpoint.
  • request_timeout (optional, default=240000, constraint: >= 1000)

    Timeout when waiting for a response, connection restart when timed out.
  • query_api_versions (optional, default=true)

    Must be set to false to work with kafka versions prior to 0.10, When set to true, at connection start, brod will send a query request to get the broker supported API version ranges. When set to 'false', brod will always use the lowest supported API version when sending requests to kafka. Supported API version ranges can be found in: brod_kafka_apis:supported_versions/1
  • extra_sock_opts (optional, default=[])

    Extra socket options to tune socket performance. e.g. [{sndbuf, 1 bsl 20}]. More info
Link to this function

start_consumer(Client, TopicName, ConsumerConfig)

View Source
-spec start_consumer(client(), topic(), consumer_config()) -> ok | {error, any()}.
Dynamically start a topic consumer.

See also: for details about consumer config..

Link to this function

start_producer(Client, TopicName, ProducerConfig)

View Source
-spec start_producer(client(), topic(), producer_config()) -> ok | {error, any()}.
Dynamically start a per-topic producer.

See also: brod_producer:start_link/4.

-spec stop() -> ok.
Stop brod application.
Application behaviour callback
-spec stop_client(client()) -> ok.
Stop a client.
Link to this function

subscribe(ConsumerPid, SubscriberPid, Options)

View Source
-spec subscribe(pid(), pid(), consumer_options()) -> ok | {error, any()}.
Link to this function

subscribe(Client, SubscriberPid, Topic, Partition, Options)

View Source
-spec subscribe(client(), pid(), topic(), partition(), consumer_options()) ->
             {ok, pid()} | {error, any()}.

Subscribe to a data stream from the given topic-partition.

If {error, Reason} is returned, the caller should perhaps retry later.

{ok, ConsumerPid} is returned on success. The caller may want to monitor the consumer pid and re-subscribe should the ConsumerPid crash.

Upon successful subscription the subscriber process should expect messages of pattern: {ConsumerPid, #kafka_message_set{}} and {ConsumerPid, #kafka_fetch_error{}}.

-include_lib("brod/include/brod.hrl") to access the records.

In case #kafka_fetch_error{} is received the subscriber should re-subscribe itself to resume the data stream.
Link to this function

sync_produce_request(CallRef)

View Source
-spec sync_produce_request(call_ref()) -> ok | {error, Reason :: any()}.
Block wait for sent produced request to be acked by kafka.
Link to this function

sync_produce_request(CallRef, Timeout)

View Source
-spec sync_produce_request(call_ref(), timeout()) -> ok | {error, Reason :: any()}.
Link to this function

sync_produce_request_offset(CallRef)

View Source
-spec sync_produce_request_offset(call_ref()) -> {ok, offset()} | {error, Reason :: any()}.
As sync_produce_request_offset/1, but also returning assigned offset See produce_sync_offset/5.
Link to this function

sync_produce_request_offset(CallRef, Timeout)

View Source
-spec sync_produce_request_offset(call_ref(), timeout()) -> {ok, offset()} | {error, Reason :: any()}.
Link to this function

unsubscribe(ConsumerPid)

View Source
-spec unsubscribe(pid()) -> ok | {error, any()}.
Unsubscribe the current subscriber. Assuming the subscriber is self().
Link to this function

unsubscribe(ConsumerPid, SubscriberPid)

View Source
-spec unsubscribe(pid(), pid()) -> ok | {error, any()}.
Unsubscribe the current subscriber.
Link to this function

unsubscribe(Client, Topic, Partition)

View Source
-spec unsubscribe(client(), topic(), partition()) -> ok | {error, any()}.
Unsubscribe the current subscriber. Assuming the subscriber is self().
Link to this function

unsubscribe(Client, Topic, Partition, SubscriberPid)

View Source
-spec unsubscribe(client(), topic(), partition(), pid()) -> ok | {error, any()}.
Unsubscribe the current subscriber.