Behaviours: gen_server.
call_ref() = brod:call_ref()
config() = proplists:proplist()
offset() = brod:offset()
partition() = brod:partition()
topic() = brod:topic()
produce/3 | Produce a message to partition asynchronizely. |
produce_cb/4 | Async produce, evaluate callback if AckCb is a function
otherwise send #brod_produce_reply{result = brod_produce_req_acked}
message to caller after the produce request has been acked by kafka. |
produce_no_ack/3 | Fire-n-forget, no ack, no back-pressure. |
start_link/4 | Start (link) a partition producer. |
stop/1 | Stop the process. |
sync_produce_request/2 | Block calling process until it receives an acked reply for the
CallRef . |
produce(Pid::pid(), Key::brod:key(), Value::brod:value()) -> {ok, call_ref()} | {error, any()}
Produce a message to partition asynchronizely.
The call is blocked until the request has been buffered in producer worker
The function returns a call reference of type call_ref()
to the
caller so the caller can used it to expect (match) a
#brod_produce_reply{result = brod_produce_req_acked}
message after the produce request has been acked by kafka.
produce_cb(Pid::pid(), Key::brod:key(), Value::brod:value(), AckCb::undefined | brod:produce_ack_cb()) -> ok | {ok, call_ref()} | {error, any()}
Async produce, evaluate callback if AckCb
is a function
otherwise send #brod_produce_reply{result = brod_produce_req_acked}
message to caller after the produce request has been acked by kafka.
produce_no_ack(Pid::pid(), Key::brod:key(), Value::brod:value()) -> ok
Fire-n-forget, no ack, no back-pressure.
start_link(ClientPid::pid(), Topic::topic(), Partition::partition(), Config::config()) -> {ok, pid()}
Start (link) a partition producer.
Possible configs:required_acks
(optional, default = -1):
ack_timeout
(optional, default = 10000 ms):
RequiredAcks
. The timeout is not an exact
limit on the request time for a few reasons: (1) it does not include
network latency, (2) the timer begins at the beginning of the processing
of this request so if many requests are queued due to broker overload
that wait time will not be included, (3) kafka leader will not terminate
a local write so if the local write time exceeds this timeout it will
not be respected.partition_buffer_limit
(optional, default = 256):
partition_onwire_limit
(optional, default = 1):
How many message sets (per-partition) can be sent to kafka broker asynchronously before receiving ACKs from broker.
NOTE: setting a number greater than 1 may cause messages being persisted in an order different from the order they were produced.max_batch_size
(in bytes, optional, default = 1M):
In case callers are producing faster than brokers can handle (or congestion on wire), try to accumulate small requests into batches as much as possible but not exceeding max_batch_size.
OBS: If compression is enabled, care should be taken when picking the max batch size, because a compressed batch will be produced as one message and this message might be larger than 'max.message.bytes' in kafka config (or topic config)max_retries
(optional, default = 3):
{max_retries, N}
is given, the producer retry produce request for
N times before crashing in case of failures like connection being
shutdown by remote or exceptions received in produce response from kafka.
The special value N = -1 means "retry indefinitely"retry_backoff_ms
(optional, default = 500);
compression
(optional, default = no_compression):
gzip
or snappy
to enable compressionmax_linger_ms
(optional, default = 0):
Messages are allowed to 'linger' in buffer for this amount of milli-seconds before being sent. Definition of 'linger': A message is in "linger" state when it is allowed to be sent on-wire, but chosen not to (for better batching).
The default value is 0 for 2 reasons:brod:produce_sync
callersmax_linger_count
(optional, default = 0):
At most this amount (count not size) of messages are allowed to "linger" in buffer. Messages will be sent regardless of "linger" age when this threshold is hit.
NOTE: It does not make sense to have this value set larger thanpartition_buffer_limit
produce_req_vsn
(optional, default = undefined):
unknown_server_error
error code.stop(Pid::pid()) -> ok
Stop the process
sync_produce_request(CallRef::call_ref(), Timeout::timeout()) -> {ok, offset()} | {error, Reason}
Block calling process until it receives an acked reply for the
CallRef
. The caller pid of this function must be the caller of
produce/3
in which the call reference was created.
Generated by EDoc