Macro for creating Pulsar consumer callback modules that support internal state.
This module provides a use macro that sets up your module as a consumer callback
with default implementations for optional callbacks.
Usage
Use this module in your consumer callback:
defmodule MyApp.MessageCounter do
use Pulsar.Consumer.Callback
# Implement required callbacks
def handle_message(message, state) do
# Process message
{:ok, state}
end
endWhen you use Pulsar.Consumer.Callback, your module automatically:
- Gets the
@behaviour Pulsar.Consumer.Callbackannotation - Receives default implementations for all optional callbacks
- Can override any callback by simply defining it
Required Callbacks
handle_message/2- Handle incoming messages with current state
Optional Callbacks
All optional callbacks have sensible defaults that you can override:
init/1- Initialize the callback module state (default:{:ok, nil})terminate/2- Cleanup when consumer terminates (default::ok)handle_call/3- Handle synchronous calls (default:{:reply, {:error, :not_implemented}, state})handle_cast/2- Handle asynchronous casts (default:{:noreply, state})handle_info/2- Handle other messages (default:{:noreply, state})
Message Format
handle_message/2 receives a Pulsar.Message struct containing all message information.
See Pulsar.Message for detailed field documentation.
You can pattern match on the struct:
def handle_message(%Pulsar.Message{payload: payload}, state) do
# Process the payload
{:ok, state}
endOr access fields directly:
def handle_message(%Pulsar.Message{} = message, state) do
payload = message.payload
message_id = message.message_id_to_ack
{:ok, state}
endExample Implementation
defmodule MyApp.MessageCounter do
use Pulsar.Consumer.Callback
def init(opts) do
max_messages = Keyword.get(opts, :max_messages, 1000)
{:ok, %{count: 0, max_messages: max_messages, messages: []}}
end
def handle_message(%Pulsar.Message{payload: payload}, callback_state) do
new_state = %{
callback_state
| count: callback_state.count + 1,
messages: [payload | callback_state.messages]
}
# Stop processing if we've reached the limit
if new_state.count >= new_state.max_messages do
{:stop, new_state}
else
{:ok, new_state}
end
end
def terminate(_reason, callback_state) do
IO.puts("Processed #{callback_state.count} messages")
:ok
end
# Optional: Add custom GenServer calls for external access
def handle_call(:get_count, _from, callback_state) do
{:reply, callback_state.count, callback_state}
end
def handle_call(:get_messages, _from, callback_state) do
{:reply, Enum.reverse(callback_state.messages), callback_state}
end
def handle_cast(:reset, callback_state) do
{:noreply, %{callback_state | count: 0, messages: []}}
end
endExtending the Consumer
The consumer callback module has default implementations for all optional callbacks. You can override these by simply defining the callback in your module:
defmodule MyApp.MessageCounter do
use Pulsar.Consumer.Callback
def init(opts) do
{:ok, %{count: 0, max: Keyword.get(opts, :max, 100)}}
end
def handle_message(message, state) do
{:ok, %{state | count: state.count + 1}}
end
# Override default handle_call to add custom functionality
def handle_call(:get_count, _from, state) do
{:reply, state.count, state}
end
# Override default handle_cast for custom async operations
def handle_cast(:reset, state) do
{:noreply, %{state | count: 0}}
end
endExample usage:
# Get current count via custom handle_call
count = GenServer.call(consumer_pid, :get_count)
# Reset state via custom handle_cast
GenServer.cast(consumer_pid, :reset)Multiple Consumers
For shared or key-shared subscriptions, you can start multiple consumer processes to increase throughput:
# Start 3 consumers for shared processing
{:ok, consumer_pids} = Pulsar.start_consumer(
topic,
subscription,
:Key_Shared,
MyCallback,
consumer_count: 3
)
# Each consumer maintains its own independent state
Enum.each(consumer_pids, fn consumer_pid ->
count = GenServer.call(consumer_pid, :get_count)
IO.puts("Consumer #{inspect(consumer_pid)} processed #{count} messages")
end)Return Values
init/1 (Optional)
If not implemented, defaults to {:ok, nil}.
{:ok, state}- Successful initialization with initial state{:error, reason}- Initialization failed
handle_message/2
{:ok, new_state}- Message processed successfully, acknowledge message automatically{:error, reason, new_state}- Processing failed, track for redelivery{:noreply, new_state}- Message processed, but don't automatically ACK/NACK. UsePulsar.Consumer.ack/2orPulsar.Consumer.nack/2for manual acknowledgment{:stop, new_state}- Message processed successfully, acknowledge, then stop consumer
terminate/2 (Optional)
If not implemented, defaults to :ok.
:ok- Cleanup completed successfully- Any other value is ignored
Manual Acknowledgment
When you return {:noreply, state} from handle_message/2, the message will NOT be automatically
acknowledged or negatively acknowledged. This gives you full control over when and how to ACK/NACK messages,
which is useful for:
- Broadway pipelines that handle acknowledgment in batches
- Async processing where acknowledgment happens after the callback returns
- Custom acknowledgment logic based on downstream processing
Example with manual ACK:
def handle_message(%Pulsar.Message{payload: payload, message_id_to_ack: message_id_to_ack}, state) do
# Use message_id_to_ack (not command.message_id) for manual ACK/NACK
# This ensures batch messages are ACKed with the correct batch_index
# Send to async processor
Task.async(fn ->
case process_async(payload) do
:ok -> Pulsar.Consumer.ack(self(), message_id_to_ack)
{:error, _} -> Pulsar.Consumer.nack(self(), message_id_to_ack)
end
end)
{:noreply, state}
end
Summary
Types
@type init_arg() :: term()
@type message_args() :: Pulsar.Message.t()
@type reason() :: term()
@type state() :: term()
Callbacks
@callback handle_call(term(), GenServer.from(), state()) :: {:reply, term(), state()} | {:reply, term(), state(), timeout() | :hibernate | {:continue, term()}} | {:noreply, state()} | {:noreply, state(), timeout() | :hibernate | {:continue, term()}} | {:stop, term(), term(), state()} | {:stop, term(), state()}