0.3.0 (2026-04-20)
Breaking changes and reliability rework.
This release removes the ETS/disk-log buffer layer and replaces it with proper MQTT protocol-level backpressure. The result is a simpler, more reliable producer that correctly participates in QoS 1/2 guarantees.
Breaking changes
- ETS buffer removed. The options
buffer_size,buffer_overflow_strategy,buffer_durability, andbuffer_log_dirno longer exist. Remove them from your configuration. Backpressure is now handled entirely bymax_inflightand the MQTT protocol. clean_startdefaults tofalse(wastrue). With the old default, the broker discarded the session on every reconnect, silently losing all unACKed QoS 1/2 messages. The new default preserves the session so the broker redelivers unACKed messages after a producer restart. If you explicitly want a fresh session on each connect, setconfig: [clean_start: true].topicsis now required. Previously it defaulted to[], starting a producer that subscribed to nothing. Omittingtopicsnow raises at startup.MessageHandlerbehaviour simplified. The callbackshandle_connect/1,handle_disconnect/1, andhandle_pubrel/1have been removed. Onlyhandle_message/3remains. Custom handlers implementing the removed callbacks must delete them.concurrency > 1requiresshared_group. Starting a pipeline with multiple producer instances and noshared_groupnow raises at startup. Without shared subscriptions every producer receives every message, causing duplicates.- Client ID suffix
_Nis now always appended. Each producer instance connects with{clientid}_0,{clientid}_1, etc. A pipeline that previously connected asmy-clientnow connects asmy-client_0. If you have a persistent broker session keyed by the exact client ID, update theclientidin your config to match the new suffix (e.g.clientid: "my-client_0") or accept that the session will be treated as new on first connect. - emqtt bumped to
~> 1.14, cowlib to~> 2.13.0.
Bug fixes
- QoS 2 acknowledgement fixed. The
pubcompstep was incorrectly calling:emqtt.pubrecinstead of:emqtt.pubcomp, breaking the QoS 2 handshake entirely. QoS 2 exactly-once delivery now works correctly.
New features
- Protocol-level backpressure via
max_inflight. The broker stops delivering new messages oncemax_inflightunACKed QoS 1/2 messages are outstanding. Default is 100. For MQTT v5,Receive-Maximumis automatically set in the CONNECT properties so the broker enforces the limit server-side. shared_groupoption for distributing messages across multiple producer instances using MQTT shared subscriptions ($share/{group}/{topic}).- New
configoptions:reconnect,reconnect_timeout,low_mem. connection: :downtelemetry event emitted when the MQTT connection is lost, with%{client_id: string, producer_index: integer, reason: term}metadata.- Clearer startup errors. A
Logger.errormessage including host and port is emitted when the producer fails to connect to the broker. persistent_termcleanup. The ack-options entry written at pipeline startup is now erased when the pipeline stops, preventing accumulation in long-running applications that start and stop pipelines dynamically.
Migration from v0.2.x
Remove buffer options
The ETS/disk-log buffer has been removed. Delete these options from your producer config:
# Remove all of these:
buffer_size: 10_000,
buffer_overflow_strategy: :drop_head,
buffer_durability: :durable,
buffer_log_dir: System.tmp_dir!(),Backpressure is now handled by max_inflight.
Review clean_start
clean_start now defaults to false (was true). This is the safer default: the broker
redelivers unACKed QoS 1/2 messages after a restart instead of discarding them.
If your pipeline was relying on the broker discarding the session on reconnect, add
config: [clean_start: true] explicitly. Be aware this means unACKed messages are lost
on every restart.
Remove MessageHandler callbacks
If you implemented a custom MessageHandler, delete any handle_connect/1, handle_disconnect/1,
or handle_pubrel/1 callbacks. Only handle_message/3 is part of the behaviour.
Add shared_group for concurrency > 1
If you were running with concurrency > 1, you must now add shared_group:
# Before (would cause duplicate messages):
producer: [
module: {OffBroadway.EMQTT.Producer, topics: [{"my/topic", 1}], config: [...]},
concurrency: 3
]
# After:
producer: [
module: {OffBroadway.EMQTT.Producer,
topics: [{"my/topic", 1}],
shared_group: "my-pipeline",
config: [...]
},
concurrency: 3
]Update dependencies
{:off_broadway_emqtt, "~> 0.3.0"}0.2.1 (2025-06-05)
- If
:clean_startoption istrue, truncate the buffer log file and skip replay when the producer starts. - Properly disconnect from the MQTT broker on terminate.
0.2.0 (2025-06-03)
- Add support for wrapping the ETS buffer cache with :disk_log to persist cached messages for producer.
- Introduced new option
buffer_durabilitywhich can be either:durableor:transient. When:durable, messages will be persisted to disk to ensure messages are not lost if the producer crashes. Defaults to:transient(in-memory buffer only). - New option
buffer_log_dircan be either a string, or a zero-arity function that returns the directory to store buffer logs. - Added new telemetry events for
:durablebuffer operations.
- Introduced new option
0.1.1 (unreleased)
Never tagged. Changes rolled into later releases.
- Emitting
off_broadway_emqtt.receive_message.ackreads message topic from message receipt instead of from the message body. This ensures that topic is included in telemetry events even if the message has been altered during dispatch. - Move
emqtt.start_link/1andemqtt.connect/1to ahandle_continue/2callback to prevent blockingGenServer.init/1. - Convert
hostandserver_name_indicationto charlist when validating options. - Return new state from
handle_continueon connection error. - Publish the
payloadfield as message data, and the rest as metadata.
0.1.0 (2024-09-24)
Initial release.
The initial release supports connecting to an MQTT broker using emqtt, and consume messages using a Broadway pipeline.
Supported features
- [x] Support most
emqttconfigurable options as producer config options. - [x] Specify buffer size and overflow strategy for the
etstable buffer. - [x]
OffBroadway.EMQTT.MessageHandlerbehaviour to support overriding default implementation. - [x] Telemetry events for observability.