View Source BroadwayKafka.Producer (BroadwayKafka v0.4.4)
A Kafka connector for Broadway.
BroadwayKafka can subscribe as a consumer to one or more topics and process streams of records within the same consumer group. Communication is done through Kafka's Consumer API using the :brod client.
Options
:hosts
- Required. A list of host and port tuples or a single string of comma separated HOST:PORT pairs to use for establishing the initial connection to Kafka, e.g. [localhost: 9092]. Examples:# Keyword ["kafka-vm1": 9092, "kafka-vm2": 9092, "kafka-vm3": 9092] # List of tuples [{"kafka-vm1", 9092}, {"kafka-vm2", 9092}, {"kafka-vm3", 9092}] # String "kafka-vm1:9092,kafka-vm2:9092,kafka-vm3:9092"
:group_id
- Required. A unique string that identifies the consumer group the producer will belong to.:topics
- Required. A list of topics that the producer will subscribe to.:receive_interval
- Optional. The duration (in milliseconds) for which the producer waits before making a request for more messages. Default is 2000 (2 seconds).:offset_commit_on_ack
- Optional. Tells Broadway to send or not an offset commit request after each acknowledgement. Default istrue
. Setting this value tofalse
can increase performance since commit requests will respect the:offset_commit_interval_seconds
option. However, setting long commit intervals might lead to a large number of duplicated records to be processed after a server restart or connection loss. If that's the case, make sure your logic is idempotent when consuming records to avoid inconsistencies. Also, bear in mind the the negative performance impact might be insignificant if you're using batchers since only one commit request will be performed per batch.:offset_reset_policy
- Optional. Defines the offset to be used when there's no initial offset in Kafka or if the current offset has expired. Possible values are:earliest
,:latest
or {:timestamp, timestamp} (in milliseconds). Default is:latest
.:begin_offset
- Optional. Defines how to get the initial offset for the consumers. The possible values are:assigned
or:reset
. When set to:assigned
the starting offset will be the ones returned in the kafka partition assignments (the lastest committed offsets for the consumer group). When set to:reset
, the starting offset will be dictated by the:offset_reset_policy
option, either starting from the:earliest
or the:latest
offsets of the topic. Default is:assigned
.:shared_client
- Optional. When false, it starts one client per producer. When true, it starts a single shared client across all producers (which may reduce memory/resource usage). Default isfalse
.:group_config
- Optional. A list of options used to configure the group coordinator. See the "Group config options" section below for a list of all available options.:fetch_config
- Optional. A list of options used when fetching messages. See the "Fetch config options" section below for a list of all available options.:client_config
- Optional. A list of options used when creating the client. See the "Client config options" section below for a list of all available options.
Group config options
The available options that will be passed to :brod
's group coordinator.
:offset_commit_interval_seconds
- Optional. The time interval between two OffsetCommitRequest messages. Default is 5.:rejoin_delay_seconds
- Optional. Delay in seconds before rejoining the group. Default is 1.:session_timeout_seconds
- Optional. Time in seconds the group coordinator broker waits before considering a member 'down' if no heartbeat or any kind of request is received. A group member may also consider the coordinator broker 'down' if no heartbeat response is received in the past N seconds. Default is 30 seconds.:heartbeat_rate_seconds
- Optional. Time in seconds for member to 'ping' group coordinator. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than:session_timeout_seconds
, typically equal to or lower than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. Default is 5 seconds.:rebalance_timeout_seconds
- Optional. Time in seconds for each worker to join the group once a rebalance has begun. If the timeout is exceeded, then the worker will be removed from the group, which will cause offset commit failures. Default is 30.
Fetch config options
The available options that will be internally passed to :brod.fetch/5
.
:min_bytes
- Optional. The minimum amount of data to be fetched from the server. If not enough data is available the request will wait for that much data to accumulate before answering. Default is 1 byte. Setting this value greater than 1 can improve server throughput a bit at the cost of additional latency.:max_bytes
- Optional. The maximum amount of data to be fetched at a time from a single partition. Default is 1048576 (1 MiB). Setting greater values can improve server throughput at the cost of more memory consumption.:max_wait_time
- Optional. Time in millisecond. Max number of milliseconds allowed for the broker to collectmin_bytes
of messages in fetch response. Default is 1000ms.
Client config options
The available options that will be internally passed to :brod.start_client/3
.
:client_id_prefix
- Optional. A string that will be used to build the client id passed to:brod
. The example valueclient_id_prefix: :"#{Node.self()} -"
would generate the following connection log from our integration tests:20:41:37.717 [info] :supervisor: {:local, :brod_sup} :started: [ pid: #PID<0.286.0>, id: :"nonode@nohost - Elixir.BroadwayKafka.ConsumerTest.MyBroadway.Broadway.Producer_0.Client", mfargs: {:brod_client, :start_link, [ [localhost: 9092], :"nonode@nohost - Elixir.BroadwayKafka.ConsumerTest.MyBroadway.Broadway.Producer_0.Client", [client_id_prefix: :"nonode@nohost - "] ]}, restart_type: {:permanent, 10}, shutdown: 5000, child_type: :worker ]
:sasl
- Optional. A a tuple of mechanism which can be:plain
,:scram_sha_256
or:scram_sha_512
, username and password. See the:brod
'sAuthentication Support
documentation for more information. Default is no sasl options.:ssl
- Optional. A boolean or a list of options to use when connecting via SSL/TLS. See thetls_client_option
documentation for more information. Default is no ssl options.:connect_timeout
- Optional. Time in milliseconds to be used as a timeout for:brod
's communication with Kafka. Default is to use:brod
's default timeout which is currently 5 seconds.:request_timeout
- Optional. Time in milliseconds to be used as a timeout for waiting response from Kafka. Default is to use:brod
's default timeout which is currently 240 seconds.:extra_sock_opts
- Optional.gen_tcp
socket options. More info. Set to[:inet6]
if your Kafka broker uses IPv6.
Note: Currently, Broadway does not support all options provided by
:brod
. If you have a scenario where you need any extra option that is not listed above, please open an issue, so we can consider adding it.
Example
Broadway.start_link(MyBroadway,
name: MyBroadway,
producer: [
module: {BroadwayKafka.Producer, [
hosts: [localhost: 9092],
group_id: "group_1",
topics: ["test"],
]},
concurrency: 1
],
processors: [
default: [
concurrency: 10
]
]
)
Concurrency and partitioning
The concurrency model provided by Kafka is based on partitioning, i.e., the more partitions
you have, the more concurrency you get. However, in order to take advantage of this model
you need to set up the :concurrency
options for your processors and batchers accordingly. Having
less concurrency than topic/partitions assigned will result in individual processors handling more
than one partition, decreasing the overall level of concurrency. Therefore, if you want to
always be able to process messages at maximum concurrency (assuming you have enough resources
to do it), you should increase the concurrency up front to make sure you have enough
processors to handle the extra messages received from new partitions assigned.
Note: Even if you don't plan to add more partitions to a Kafka topic, your pipeline can still receive more assignments than planned. For instance, if another consumer crashes, the server will reassign all its topic/partition to other available consumers, including any Broadway producer subscribed to the same topic.
Handling failed messages
BroadwayKafka
never stops the flow of the stream, i.e. it will always ack the messages
even when they fail. Unlike queue-based connectors, where you can mark a single message as failed.
In Kafka that's not possible due to its single offset per topic/partition ack strategy. If you
want to reprocess failed messages, you need to roll your own strategy. A possible way to do that
is to implement Broadway.handle_failed/2
and send failed messages to a separated stream or queue for
later processing.
Message metadata
When producing messages, the following information will be passed to
Broadway.Message
's metadata.
topic
- The topic the message was published.partition
- The topic partition.offset
- The offset assigned to the message inside the partition.key
- The partition key.ts
- A timestamp associated with the message.headers
- The headers of the message.
Telemetry
This producer emits a few Telemetry events which are listed below.
[:broadway_kafka, :assignments_revoked, :start | :stop | :exception]
spans - these events are emitted in "span style" when receiving assignments revoked call from consumer group coordinator See:telemetry.span/3
.