View Source brod_producer (brod v4.0.0)
A brod_producer
is a gen_server
that is responsible for producing messages to a given partition of a given topic.
Summary
Functions
Produce a message to partition asynchronously.
AckCb
is a function otherwise send #brod_produce_reply{result = brod_produce_req_acked}
message to caller after the produce request has been acked by kafka.Start (link) a partition producer.
Block calling process until it receives an acked reply for the CallRef
.
Types
-type call_ref() :: brod:call_ref().
-type config() :: proplists:proplist().
-type conn() :: kpro:connection().
-type milli_sec() :: non_neg_integer().
-type offset() :: brod:offset().
-type partition() :: brod:partition().
-type state() :: #state{client_pid :: pid(), topic :: topic(), partition :: partition(), connection :: undefined | conn(), conn_mref :: undefined | reference(), buffer :: brod_producer_buffer:buf(), retry_backoff_ms :: non_neg_integer(), retry_tref :: undefined | reference(), delay_send_ref :: delay_send_ref(), produce_req_vsn :: {default | resolved | configured, brod_kafka_apis:vsn()}}.
-type topic() :: brod:topic().
Functions
-spec produce(pid(), brod:key(), brod:value()) -> {ok, call_ref()} | {error, any()}.
Produce a message to partition asynchronously.
The call is blocked until the request has been buffered in producer worker The function returns a call reference of typecall_ref()
to the caller so the caller can used it to expect (match) a #brod_produce_reply{result = brod_produce_req_acked}
message after the produce request has been acked by kafka.
-spec produce_cb(pid(), brod:key(), brod:value(), undefined | brod:produce_ack_cb()) -> ok | {ok, call_ref()} | {error, any()}.
AckCb
is a function otherwise send #brod_produce_reply{result = brod_produce_req_acked}
message to caller after the produce request has been acked by kafka.
-spec produce_no_ack(pid(), brod:key(), brod:value()) -> ok.
Start (link) a partition producer.
Possible configs (passed as a proplist):
How many acknowledgements the kafka broker should receive from the clustered replicas before acking producer. 0: the broker will not send any response (this is the only case where the broker will not reply to a request) 1: The leader will wait the data is written to the local log before sending a response. -1: If it is -1 the broker will block until the message is committed by all in sync replicas before acking.required_acks
(optional, default = -1):
Maximum time in milliseconds the broker can await the receipt of the number of acknowledgements inack_timeout
(optional, default = 10000 ms):RequiredAcks
. The timeout is not an exact limit on the request time for a few reasons: (1) it does not include network latency, (2) the timer begins at the beginning of the processing of this request so if many requests are queued due to broker overload that wait time will not be included, (3) kafka leader will not terminate a local write so if the local write time exceeds this timeout it will not be respected.
How many requests (per-partition) can be buffered without blocking the caller. The callers are released (by receiving the 'brod_produce_req_buffered' reply) once the request is taken into buffer and after the request has been put on wire, then the caller may expect a reply 'brod_produce_req_acked' when the request is accepted by kafka.partition_buffer_limit
(optional, default = 256):partition_onwire_limit
(optional, default = 1):How many message sets (per-partition) can be sent to kafka broker asynchronously before receiving ACKs from broker.
NOTE: setting a number greater than 1 may cause messages being persisted in an order different from the order they were produced.max_batch_size
(in bytes, optional, default = 1M):In case callers are producing faster than brokers can handle (or congestion on wire), try to accumulate small requests into batches as much as possible but not exceeding max_batch_size.
OBS: If compression is enabled, care should be taken when picking the max batch size, because a compressed batch will be produced as one message and this message might be larger than 'max.message.bytes' in kafka config (or topic config)
Ifmax_retries
(optional, default = 3):{max_retries, N}
is given, the producer retry produce request for N times before crashing in case of failures like connection being shutdown by remote or exceptions received in produce response from kafka. The special value N = -1 means "retry indefinitely"
Time in milli-seconds to sleep before retry the failed produce request.retry_backoff_ms
(optional, default = 500);compression
(optional, default =no_compression`): `gzip
orsnappy
to enable compressionmax_linger_ms
(optional, default = 0):Messages are allowed to 'linger' in buffer for this amount of milli-seconds before being sent. Definition of 'linger': A message is in "linger" state when it is allowed to be sent on-wire, but chosen not to (for better batching).
The default value is 0 for 2 reasons:- Backward compatibility (for 2.x releases)
- Not to surprise
brod:produce_sync
callers
max_linger_count
(optional, default = 0):At most this amount (count not size) of messages are allowed to "linger" in buffer. Messages will be sent regardless of "linger" age when this threshold is hit.
NOTE: It does not make sense to have this value set larger thanpartition_buffer_limit
User determined produce API version to use, discard the API version range received from kafka. This is to be used when a topic in newer version kafka is configured to store older version message format. e.g. When a topic in kafka 0.11 is configured to have message format 0.10, sending message with headers would result inproduce_req_vsn
(optional, default = undefined):unknown_server_error
error code.
-spec stop(pid()) -> ok.
-spec sync_produce_request(call_ref(), timeout()) -> {ok, offset()} | {error, Reason} when Reason :: timeout | {producer_down, any()}.
Block calling process until it receives an acked reply for the CallRef
.
produce/3
in which the call reference was created.