0.3.0 (2026-04-20)

Breaking changes and reliability rework.

This release removes the ETS/disk-log buffer layer and replaces it with proper MQTT protocol-level backpressure. The result is a simpler, more reliable producer that correctly participates in QoS 1/2 guarantees.

Breaking changes

  • ETS buffer removed. The options buffer_size, buffer_overflow_strategy, buffer_durability, and buffer_log_dir no longer exist. Remove them from your configuration. Backpressure is now handled entirely by max_inflight and the MQTT protocol.
  • clean_start defaults to false (was true). With the old default, the broker discarded the session on every reconnect, silently losing all unACKed QoS 1/2 messages. The new default preserves the session so the broker redelivers unACKed messages after a producer restart. If you explicitly want a fresh session on each connect, set config: [clean_start: true].
  • topics is now required. Previously it defaulted to [], starting a producer that subscribed to nothing. Omitting topics now raises at startup.
  • MessageHandler behaviour simplified. The callbacks handle_connect/1, handle_disconnect/1, and handle_pubrel/1 have been removed. Only handle_message/3 remains. Custom handlers implementing the removed callbacks must delete them.
  • concurrency > 1 requires shared_group. Starting a pipeline with multiple producer instances and no shared_group now raises at startup. Without shared subscriptions every producer receives every message, causing duplicates.
  • Client ID suffix _N is now always appended. Each producer instance connects with {clientid}_0, {clientid}_1, etc. A pipeline that previously connected as my-client now connects as my-client_0. If you have a persistent broker session keyed by the exact client ID, update the clientid in your config to match the new suffix (e.g. clientid: "my-client_0") or accept that the session will be treated as new on first connect.
  • emqtt bumped to ~> 1.14, cowlib to ~> 2.13.0.

Bug fixes

  • QoS 2 acknowledgement fixed. The pubcomp step was incorrectly calling :emqtt.pubrec instead of :emqtt.pubcomp, breaking the QoS 2 handshake entirely. QoS 2 exactly-once delivery now works correctly.

New features

  • Protocol-level backpressure via max_inflight. The broker stops delivering new messages once max_inflight unACKed QoS 1/2 messages are outstanding. Default is 100. For MQTT v5, Receive-Maximum is automatically set in the CONNECT properties so the broker enforces the limit server-side.
  • shared_group option for distributing messages across multiple producer instances using MQTT shared subscriptions ($share/{group}/{topic}).
  • New config options: reconnect, reconnect_timeout, low_mem.
  • connection: :down telemetry event emitted when the MQTT connection is lost, with %{client_id: string, producer_index: integer, reason: term} metadata.
  • Clearer startup errors. A Logger.error message including host and port is emitted when the producer fails to connect to the broker.
  • persistent_term cleanup. The ack-options entry written at pipeline startup is now erased when the pipeline stops, preventing accumulation in long-running applications that start and stop pipelines dynamically.

Migration from v0.2.x

Remove buffer options

The ETS/disk-log buffer has been removed. Delete these options from your producer config:

# Remove all of these:
buffer_size: 10_000,
buffer_overflow_strategy: :drop_head,
buffer_durability: :durable,
buffer_log_dir: System.tmp_dir!(),

Backpressure is now handled by max_inflight.

Review clean_start

clean_start now defaults to false (was true). This is the safer default: the broker redelivers unACKed QoS 1/2 messages after a restart instead of discarding them.

If your pipeline was relying on the broker discarding the session on reconnect, add config: [clean_start: true] explicitly. Be aware this means unACKed messages are lost on every restart.

Remove MessageHandler callbacks

If you implemented a custom MessageHandler, delete any handle_connect/1, handle_disconnect/1, or handle_pubrel/1 callbacks. Only handle_message/3 is part of the behaviour.

Add shared_group for concurrency > 1

If you were running with concurrency > 1, you must now add shared_group:

# Before (would cause duplicate messages):
producer: [
  module: {OffBroadway.EMQTT.Producer, topics: [{"my/topic", 1}], config: [...]},
  concurrency: 3
]

# After:
producer: [
  module: {OffBroadway.EMQTT.Producer,
    topics: [{"my/topic", 1}],
    shared_group: "my-pipeline",
    config: [...]
  },
  concurrency: 3
]

Update dependencies

{:off_broadway_emqtt, "~> 0.3.0"}

0.2.1 (2025-06-05)

  • If :clean_start option is true, truncate the buffer log file and skip replay when the producer starts.
  • Properly disconnect from the MQTT broker on terminate.

0.2.0 (2025-06-03)

  • Add support for wrapping the ETS buffer cache with :disk_log to persist cached messages for producer.
    • Introduced new option buffer_durability which can be either :durable or :transient. When :durable, messages will be persisted to disk to ensure messages are not lost if the producer crashes. Defaults to :transient (in-memory buffer only).
    • New option buffer_log_dir can be either a string, or a zero-arity function that returns the directory to store buffer logs.
    • Added new telemetry events for :durable buffer operations.

0.1.1 (unreleased)

Never tagged. Changes rolled into later releases.

  • Emitting off_broadway_emqtt.receive_message.ack reads message topic from message receipt instead of from the message body. This ensures that topic is included in telemetry events even if the message has been altered during dispatch.
  • Move emqtt.start_link/1 and emqtt.connect/1 to a handle_continue/2 callback to prevent blocking GenServer.init/1.
  • Convert host and server_name_indication to charlist when validating options.
  • Return new state from handle_continue on connection error.
  • Publish the payload field as message data, and the rest as metadata.

0.1.0 (2024-09-24)

Initial release.

The initial release supports connecting to an MQTT broker using emqtt, and consume messages using a Broadway pipeline.

Supported features

  • [x] Support most emqtt configurable options as producer config options.
  • [x] Specify buffer size and overflow strategy for the ets table buffer.
  • [x] OffBroadway.EMQTT.MessageHandler behaviour to support overriding default implementation.
  • [x] Telemetry events for observability.