View Source brod_producer (brod v3.18.0)

A brod_producer is a gen_server that is responsible for producing messages to a given partition of a given topic.

See the overview for some more information and examples.

Link to this section Summary


Produce a message to partition asynchronously.

Async produce, evaluate callback if AckCb is a function otherwise send #brod_produce_reply{result = brod_produce_req_acked} message to caller after the produce request has been acked by kafka.
Fire-n-forget, no ack, no back-pressure.

Start (link) a partition producer.

Stop the process

Block calling process until it receives an acked reply for the CallRef.

Link to this section Types

-type call_ref() :: brod:call_ref().
-type config() :: proplists:proplist().
-type conn() :: kpro:connection().
-type delay_send_ref() :: undefined | {reference(), reference()}.
-type milli_sec() :: non_neg_integer().
-type offset() :: brod:offset().
-type partition() :: brod:partition().
-type state() :: #state{}.
-type topic() :: brod:topic().

Link to this section Functions

Link to this function

produce(Pid, Key, Value)

View Source
-spec produce(pid(), brod:key(), brod:value()) -> {ok, call_ref()} | {error, any()}.

Produce a message to partition asynchronously.

The call is blocked until the request has been buffered in producer worker The function returns a call reference of type call_ref() to the caller so the caller can used it to expect (match) a #brod_produce_reply{result = brod_produce_req_acked} message after the produce request has been acked by kafka.
Link to this function

produce_cb(Pid, Key, Value, AckCb)

View Source
-spec produce_cb(pid(), brod:key(), brod:value(), undefined | brod:produce_ack_cb()) ->
              ok | {ok, call_ref()} | {error, any()}.
Async produce, evaluate callback if AckCb is a function otherwise send #brod_produce_reply{result = brod_produce_req_acked} message to caller after the produce request has been acked by kafka.
Link to this function

produce_no_ack(Pid, Key, Value)

View Source
-spec produce_no_ack(pid(), brod:key(), brod:value()) -> ok.
Fire-n-forget, no ack, no back-pressure.
Link to this function

start_link(ClientPid, Topic, Partition, Config)

View Source
-spec start_link(pid(), topic(), partition(), config()) -> {ok, pid()}.

Start (link) a partition producer.

Possible configs (passed as a proplist):
  • required_acks (optional, default = -1):

    How many acknowledgements the kafka broker should receive from the clustered replicas before acking producer. 0: the broker will not send any response (this is the only case where the broker will not reply to a request) 1: The leader will wait the data is written to the local log before sending a response. -1: If it is -1 the broker will block until the message is committed by all in sync replicas before acking.
  • ack_timeout (optional, default = 10000 ms):

    Maximum time in milliseconds the broker can await the receipt of the number of acknowledgements in RequiredAcks. The timeout is not an exact limit on the request time for a few reasons: (1) it does not include network latency, (2) the timer begins at the beginning of the processing of this request so if many requests are queued due to broker overload that wait time will not be included, (3) kafka leader will not terminate a local write so if the local write time exceeds this timeout it will not be respected.
  • partition_buffer_limit (optional, default = 256):

    How many requests (per-partition) can be buffered without blocking the caller. The callers are released (by receiving the 'brod_produce_req_buffered' reply) once the request is taken into buffer and after the request has been put on wire, then the caller may expect a reply 'brod_produce_req_acked' when the request is accepted by kafka.
  • partition_onwire_limit (optional, default = 1):

    How many message sets (per-partition) can be sent to kafka broker asynchronously before receiving ACKs from broker.

    NOTE: setting a number greater than 1 may cause messages being persisted in an order different from the order they were produced.
  • max_batch_size (in bytes, optional, default = 1M):

    In case callers are producing faster than brokers can handle (or congestion on wire), try to accumulate small requests into batches as much as possible but not exceeding max_batch_size.

    OBS: If compression is enabled, care should be taken when picking the max batch size, because a compressed batch will be produced as one message and this message might be larger than 'max.message.bytes' in kafka config (or topic config)
  • max_retries (optional, default = 3):

    If {max_retries, N} is given, the producer retry produce request for N times before crashing in case of failures like connection being shutdown by remote or exceptions received in produce response from kafka. The special value N = -1 means "retry indefinitely"
  • retry_backoff_ms (optional, default = 500);

    Time in milli-seconds to sleep before retry the failed produce request.
  • compression (optional, default = no_compression`): `gzip or snappy to enable compression
  • max_linger_ms (optional, default = 0):

    Messages are allowed to 'linger' in buffer for this amount of milli-seconds before being sent. Definition of 'linger': A message is in "linger" state when it is allowed to be sent on-wire, but chosen not to (for better batching).

    The default value is 0 for 2 reasons:
    1. Backward compatibility (for 2.x releases)
    2. Not to surprise brod:produce_sync callers
  • max_linger_count (optional, default = 0):

    At most this amount (count not size) of messages are allowed to "linger" in buffer. Messages will be sent regardless of "linger" age when this threshold is hit.

    NOTE: It does not make sense to have this value set larger than partition_buffer_limit
  • produce_req_vsn (optional, default = undefined):

    User determined produce API version to use, discard the API version range received from kafka. This is to be used when a topic in newer version kafka is configured to store older version message format. e.g. When a topic in kafka 0.11 is configured to have message format 0.10, sending message with headers would result in unknown_server_error error code.
-spec stop(pid()) -> ok.
Stop the process
Link to this function

sync_produce_request(CallRef, Timeout)

View Source
-spec sync_produce_request(call_ref(), timeout()) -> {ok, offset()} | {error, Reason}
                        when Reason :: timeout | {producer_down, any()}.

Block calling process until it receives an acked reply for the CallRef.

The caller pid of this function must be the caller of produce/3 in which the call reference was created.