View Source BroadwayKafka.Producer (BroadwayKafka v0.4.4)

A Kafka connector for Broadway.

BroadwayKafka can subscribe as a consumer to one or more topics and process streams of records within the same consumer group. Communication is done through Kafka's Consumer API using the :brod client.

Options

  • :hosts - Required. A list of host and port tuples or a single string of comma separated HOST:PORT pairs to use for establishing the initial connection to Kafka, e.g. [localhost: 9092]. Examples:

    # Keyword
    ["kafka-vm1": 9092, "kafka-vm2": 9092, "kafka-vm3": 9092]
    
    # List of tuples
    [{"kafka-vm1", 9092}, {"kafka-vm2", 9092}, {"kafka-vm3", 9092}]
    
    # String
    "kafka-vm1:9092,kafka-vm2:9092,kafka-vm3:9092"
  • :group_id - Required. A unique string that identifies the consumer group the producer will belong to.

  • :topics - Required. A list of topics that the producer will subscribe to.

  • :receive_interval - Optional. The duration (in milliseconds) for which the producer waits before making a request for more messages. Default is 2000 (2 seconds).

  • :offset_commit_on_ack - Optional. Tells Broadway to send or not an offset commit request after each acknowledgement. Default is true. Setting this value to false can increase performance since commit requests will respect the :offset_commit_interval_seconds option. However, setting long commit intervals might lead to a large number of duplicated records to be processed after a server restart or connection loss. If that's the case, make sure your logic is idempotent when consuming records to avoid inconsistencies. Also, bear in mind the the negative performance impact might be insignificant if you're using batchers since only one commit request will be performed per batch.

  • :offset_reset_policy - Optional. Defines the offset to be used when there's no initial offset in Kafka or if the current offset has expired. Possible values are :earliest, :latest or {:timestamp, timestamp} (in milliseconds). Default is :latest.

  • :begin_offset - Optional. Defines how to get the initial offset for the consumers. The possible values are :assigned or :reset. When set to :assigned the starting offset will be the ones returned in the kafka partition assignments (the lastest committed offsets for the consumer group). When set to :reset, the starting offset will be dictated by the :offset_reset_policy option, either starting from the :earliest or the :latest offsets of the topic. Default is :assigned.

  • :shared_client - Optional. When false, it starts one client per producer. When true, it starts a single shared client across all producers (which may reduce memory/resource usage). Default is false.

  • :group_config - Optional. A list of options used to configure the group coordinator. See the "Group config options" section below for a list of all available options.

  • :fetch_config - Optional. A list of options used when fetching messages. See the "Fetch config options" section below for a list of all available options.

  • :client_config - Optional. A list of options used when creating the client. See the "Client config options" section below for a list of all available options.

Group config options

The available options that will be passed to :brod's group coordinator.

  • :offset_commit_interval_seconds - Optional. The time interval between two OffsetCommitRequest messages. Default is 5.

  • :rejoin_delay_seconds - Optional. Delay in seconds before rejoining the group. Default is 1.

  • :session_timeout_seconds - Optional. Time in seconds the group coordinator broker waits before considering a member 'down' if no heartbeat or any kind of request is received. A group member may also consider the coordinator broker 'down' if no heartbeat response is received in the past N seconds. Default is 30 seconds.

  • :heartbeat_rate_seconds - Optional. Time in seconds for member to 'ping' group coordinator. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than :session_timeout_seconds, typically equal to or lower than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. Default is 5 seconds.

  • :rebalance_timeout_seconds - Optional. Time in seconds for each worker to join the group once a rebalance has begun. If the timeout is exceeded, then the worker will be removed from the group, which will cause offset commit failures. Default is 30.

Fetch config options

The available options that will be internally passed to :brod.fetch/5.

  • :min_bytes - Optional. The minimum amount of data to be fetched from the server. If not enough data is available the request will wait for that much data to accumulate before answering. Default is 1 byte. Setting this value greater than 1 can improve server throughput a bit at the cost of additional latency.

  • :max_bytes - Optional. The maximum amount of data to be fetched at a time from a single partition. Default is 1048576 (1 MiB). Setting greater values can improve server throughput at the cost of more memory consumption.

  • :max_wait_time - Optional. Time in millisecond. Max number of milliseconds allowed for the broker to collect min_bytes of messages in fetch response. Default is 1000ms.

Client config options

The available options that will be internally passed to :brod.start_client/3.

  • :client_id_prefix - Optional. A string that will be used to build the client id passed to :brod. The example value client_id_prefix: :"#{Node.self()} -" would generate the following connection log from our integration tests:

    20:41:37.717 [info]      :supervisor: {:local, :brod_sup}
    :started: [
      pid: #PID<0.286.0>,
      id: :"nonode@nohost - Elixir.BroadwayKafka.ConsumerTest.MyBroadway.Broadway.Producer_0.Client",
      mfargs: {:brod_client, :start_link,
       [
         [localhost: 9092],
         :"nonode@nohost - Elixir.BroadwayKafka.ConsumerTest.MyBroadway.Broadway.Producer_0.Client",
         [client_id_prefix: :"nonode@nohost - "]
       ]},
      restart_type: {:permanent, 10},
      shutdown: 5000,
      child_type: :worker
    ]
  • :sasl - Optional. A a tuple of mechanism which can be :plain, :scram_sha_256 or :scram_sha_512, username and password. See the :brod's Authentication Support documentation for more information. Default is no sasl options.

  • :ssl - Optional. A boolean or a list of options to use when connecting via SSL/TLS. See the tls_client_option documentation for more information. Default is no ssl options.

  • :connect_timeout - Optional. Time in milliseconds to be used as a timeout for :brod's communication with Kafka. Default is to use :brod's default timeout which is currently 5 seconds.

  • :request_timeout - Optional. Time in milliseconds to be used as a timeout for waiting response from Kafka. Default is to use :brod's default timeout which is currently 240 seconds.

  • :extra_sock_opts - Optional. gen_tcp socket options. More info. Set to [:inet6] if your Kafka broker uses IPv6.

Note: Currently, Broadway does not support all options provided by :brod. If you have a scenario where you need any extra option that is not listed above, please open an issue, so we can consider adding it.

Example

Broadway.start_link(MyBroadway,
  name: MyBroadway,
  producer: [
    module: {BroadwayKafka.Producer, [
      hosts: [localhost: 9092],
      group_id: "group_1",
      topics: ["test"],
    ]},
    concurrency: 1
  ],
  processors: [
    default: [
      concurrency: 10
    ]
  ]
)

Concurrency and partitioning

The concurrency model provided by Kafka is based on partitioning, i.e., the more partitions you have, the more concurrency you get. However, in order to take advantage of this model you need to set up the :concurrency options for your processors and batchers accordingly. Having less concurrency than topic/partitions assigned will result in individual processors handling more than one partition, decreasing the overall level of concurrency. Therefore, if you want to always be able to process messages at maximum concurrency (assuming you have enough resources to do it), you should increase the concurrency up front to make sure you have enough processors to handle the extra messages received from new partitions assigned.

Note: Even if you don't plan to add more partitions to a Kafka topic, your pipeline can still receive more assignments than planned. For instance, if another consumer crashes, the server will reassign all its topic/partition to other available consumers, including any Broadway producer subscribed to the same topic.

Handling failed messages

BroadwayKafka never stops the flow of the stream, i.e. it will always ack the messages even when they fail. Unlike queue-based connectors, where you can mark a single message as failed. In Kafka that's not possible due to its single offset per topic/partition ack strategy. If you want to reprocess failed messages, you need to roll your own strategy. A possible way to do that is to implement Broadway.handle_failed/2 and send failed messages to a separated stream or queue for later processing.

Message metadata

When producing messages, the following information will be passed to Broadway.Message's metadata.

  • topic - The topic the message was published.

  • partition - The topic partition.

  • offset - The offset assigned to the message inside the partition.

  • key - The partition key.

  • ts - A timestamp associated with the message.

  • headers - The headers of the message.

Telemetry

This producer emits a few Telemetry events which are listed below.

  • [:broadway_kafka, :assignments_revoked, :start | :stop | :exception] spans - these events are emitted in "span style" when receiving assignments revoked call from consumer group coordinator See :telemetry.span/3.

Summary

Functions

Link to this macro

brod_received_assignment(args \\ [])

View Source (macro)
Link to this macro

brod_received_assignment(record, args)

View Source (macro)
Link to this macro

kafka_message(args \\ [])

View Source (macro)
Link to this macro

kafka_message(record, args)

View Source (macro)