Antenna (antenna v0.5.1)
View SourceAntenna
is a mixture of Phoenix.PubSub
and :gen_event
functionality
with some batteries included.
It implements back-pressure on top of GenStage
, is fully conformant with
OTP Design Principles. and
is distributed out of the box.
Antenna
supports both asynchronous and synchronous events. While the most preferrable way
would be to stay fully async with Antenna.event/3
, one still might propagate the event
synchronously with Antenna.sync_event/3
and collect all the responses from all the handlers.
One can have as many isolated Antenna
s as necessary, distinguished by Antenna.t:id/0
.
Getting Started
To start using Antenna, add it to your dependencies in mix.exs:
def deps do
[
{:antenna, "~> 0.3.0"}
]
end
Then add it to your supervision tree:
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
Antenna
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
Configuration
Antenna can be configured in your config.exs:
config :antenna,
id: MyApp.Antenna, # Optional custom identifier
distributed: true, # Enable distributed mode (default: true)
sync_timeout: 5_000 # Default timeout for sync_event/4
Sequence Diagram
sequenceDiagram
Consumer->>+Broadcaster: sync_event(channel, event)
Consumer->>+Broadcaster: event(channel, event)
Broadcaster-->>+Consumer@Node1: event
Broadcaster-->>+Consumer@Node2: event
Broadcaster-->>+Consumer@NodeN: event
Consumer@Node1-->>-NoOp: mine?
Consumer@NodeN-->>-NoOp: mine?
Consumer@Node2-->>+Matchers: event
Matchers-->>+Handlers: handle_match/2
Matchers-->>+Handlers: handle_match/2
Handlers->>-Consumer: response(to: sync_event)
Common Use Cases
Basic Event Handling
# Define a matcher for specific events
Antenna.match(MyApp.Antenna, {:user_registered, user_id, _meta}, fn channel, event ->
# Handle the event
{:user_registered, user_id, meta} = event
Logger.info("New user registered: #{user_id} on channel #{channel}")
MyApp.UserNotifier.welcome(user_id)
end, channels: [:user_events])
# Send events
Antenna.event(MyApp.Antenna, [:user_events], {:user_registered, "user123", %{source: "web"}})
Error Handling
# Match on error events
Antenna.match(MyApp.Antenna, {:error, reason, _context} = event, fn channel, event ->
# Log and handle errors
Logger.error("Error on channel #{channel}: #{inspect(event)}")
MyApp.ErrorTracker.track(event)
end, channels: [:errors])
# Send error events
Antenna.event(MyApp.Antenna, [:errors], {:error, :validation_failed, %{user_id: 123}})
Distributed Setup
Antenna automatically handles distribution when nodes are connected:
# On node1@host1
Node.connect(:"node2@host2")
Antenna.match(MyApp.Antenna, {:replicate, data}, fn _, {:replicate, data} ->
MyApp.Storage.replicate(data)
end, channels: [:replication])
# On node2@host2
Antenna.event(MyApp.Antenna, [:replication], {:replicate, some_data})
# The event will be processed on both nodes
Custom Matchers
You can use pattern matching and guards in your matchers:
# Match on specific patterns with guards
Antenna.match(MyApp.Antenna,
{:temperature, celsius, _} when celsius > 30,
fn _, {:temperature, c, location} ->
Logger.warning("High temperature (#{c}°C) detected at #{location}")
end,
channels: [:sensors])
# Match on map patterns
Antenna.match(MyApp.Antenna,
%{event: :payment, amount: amount} when amount > 1000,
fn _, event ->
MyApp.LargePaymentProcessor.handle(event)
end,
channels: [:payments])
Usage Example
The consumer of this library is supposed to declare one or more matchers, subscribing to one
or more channels, and then call Antenna.event/2
to propagate the event.
assert {:ok, _pid, "{:tag_1, a, _} when is_nil(a)"} =
Antenna.match(Antenna, {:tag_1, a, _} when is_nil(a), self(), channels: [:chan_1])
assert :ok = Antenna.event(Antenna, [:chan_1], {:tag_1, nil, 42})
assert_receive {:antenna_event, :chan_1, {:tag_1, nil, 42}}
Architecture
Antenna uses a distributed pub/sub architecture with the following components:
- Broadcaster - Handles event distribution across nodes
- Matchers - Pattern match on events and delegate to handlers
- Guard - Manages channel subscriptions and handler registration
- PubSub - Implements the core pub/sub functionality
Events flow through the system as follows:
- Client calls
event/3
orsync_event/4
- Broadcaster distributes the event to all nodes
- Matchers on each node pattern match against the event
- Matching handlers process the event
- For sync_event, responses are collected and returned
Best Practices
Channel Organization
- Use atoms for channel names
- Group related events under common channels
- Consider using hierarchical channel names (e.g., :users_created, :users_updated)
Pattern Matching
- Use specific patterns to avoid unnecessary pattern matches
- Include guards for more precise matching
- Consider the order of pattern matches when using multiple matchers
Handler Design
- Keep handlers focused and single-purpose
- Use
sync_event/4
when you need responses - Consider timeouts for sync operations
- Handle errors within handlers to prevent cascade failures
Performance
- Use async events (
event/3
) when possible - Keep handler processing time minimal
- Consider using separate processes for long-running operations
- Monitor matcher and handler counts
- Use async events (
Testing
- Test matchers with various event patterns
- Verify handler behavior
- Test distributed scenarios
- Use ExUnit's async: true when possible
Summary
Functions (Client)
Sends an event to all the associated matchers through channels.
Sends an event to all the associated matchers through channels synchronously, collecting and returning responses from all handlers.
Functions (Setup)
Returns a specification to start this module under a supervisor.
Adds a handler to the matcher process specified by pid
Declares a matcher for tagged events.
Starts the Antenna
matcher tree for the id
given.
Subscribes a matcher process specified by pid
to a channel(s)
Removes a handler from the matcher process specified by pid
Undeclares a matcher for tagged events previously declared with Antenna.match/4
.
Unsubscribes a previously subscribed matcher process specified by pid
from the channel(s)
Functions (Internals)
Returns a map of matches to matchers
Types
The identifier of the channel, messages can be sent to, preferrably atom()
The event being sent to the listeners
The actual handler to be associated with an event(s). It might be either a function or a process id, in which case the message of a following shape will be sent to it.
The matcher to be used for the event, e.g. {:tag_1, a, _} when is_nil(a)
The options to be passed to Antenna.match/4
and Antenna.unmatch/2
Functions (Client)
Sends an event to all the associated matchers through channels.
The special :*
might be specified as channels, then the event
will be sent to all the registered channels.
If one wants to collect results of all the registered event handlers,
they should look at sync_event/3
instead.
Examples
Basic Usage
# Send event to a single channel
Antenna.event(MyApp.Antenna, :user_events, {:user_logged_in, "user123"})
# Send event to multiple channels
Antenna.event(MyApp.Antenna, [:logs, :metrics], {:api_call, "/users", 200, 45})
# Send event to all channels
Antenna.event(MyApp.Antenna, :*, {:system_notification, :service_starting})
Custom Antenna ID
# Using a specific Antenna instance
Antenna.event(MyBackgroundJobs.Antenna, :jobs, {:job_completed, "job-123"})
@spec sync_event( id :: id(), channels :: channel() | [channel()], event :: event(), timeout :: timeout(), extra_timeout :: timeout() ) :: [term()]
Sends an event to all the associated matchers through channels synchronously, collecting and returning responses from all handlers.
The special :*
might be specified as channels, then the event
will be sent to all the registered channels.
Unlike event/3
, this function waits for all handlers to process the event
and returns their responses.
Parameters
id
- The Antenna instance identifier (optional, defaults to configured value)channels
- Single channel or list of channels to send the event toevent
- The event to be senttimeout
- Maximum time to wait for responses (default: 5000ms)
Examples
Basic Usage
# Send sync event and collect responses
results = Antenna.sync_event(MyApp.Antenna, :user_events, {:verify_user, "user123"})
# Send to multiple channels with custom timeout
results = Antenna.sync_event(
MyApp.Antenna,
[:validations, :security],
{:user_action, "user123", :delete_account},
10_000
)
Response Format
The function returns a list of handler responses, each wrapped in a tuple indicating whether the pattern matched and including the handler's result:
[
{:match, %{
match: "pattern_string",
channel: :channel_name,
results: [handler_result]
}},
{:no_match, :channel_name}
]
Functions (Setup)
Returns a specification to start this module under a supervisor.
See Supervisor
.
Adds a handler to the matcher process specified by pid
Declares a matcher for tagged events.
This function sets up a pattern matcher to handle specific events on the given channels. When an event matching the pattern is sent to any of the channels, the provided handler will be invoked.
Options
:channels
- A list of channels to subscribe this matcher to:once?
- If true, the matcher will be automatically unmatched after the first match (default: false)
Examples
Basic Usage
Antenna.match(Antenna, %{tag: _, success: false}, fn channel, message ->
Logger.warning("The processing failed for [" <>
inspect(channel) <> "], result: " <> inspect(message))
end, channels: [:rabbit])
One-time Matcher
# This matcher will only trigger once and then be removed
Antenna.match(Antenna, {:init, pid}, fn _, {:init, pid} ->
send(pid, :initialized)
end, channels: [:system], once?: true)
Using Pattern Guards
Antenna.match(Antenna,
{:metric, name, value} when value > 100,
fn channel, event ->
Logger.info("High metric value on #{channel}: #{inspect(event)}")
end,
channels: [:metrics])
Multiple Channels
Antenna.match(Antenna,
{:user_event, user_id, _action},
fn channel, event ->
# Handle user events from multiple channels
MyApp.UserEventTracker.track(channel, event)
end,
channels: [:user_logins, :user_actions, :user_settings])
@spec start_link([{:id, atom()}]) :: Supervisor.on_start()
Starts the Antenna
matcher tree for the id
given.
Subscribes a matcher process specified by pid
to a channel(s)
Removes a handler from the matcher process specified by pid
Undeclares a matcher for tagged events previously declared with Antenna.match/4
.
Accepts both an original match or a name returned by Antenna.match/4
,
which is effectively Macro.to_string(match)
.
Example
Antenna.unmatch(Antenna, %{tag: _, success: false})
Unsubscribes a previously subscribed matcher process specified by pid
from the channel(s)
Functions (Internals)
@spec registered_matchers(id :: id()) :: %{ required(term()) => {pid(), Supervisor.child_spec()} }
Returns a map of matches to matchers
Types
The identifier of the channel, messages can be sent to, preferrably atom()
@type event() :: term()
The event being sent to the listeners
@type handler() :: (event() -> term()) | (channel(), event() -> term()) | Antenna.Matcher.t() | pid() | GenServer.name()
The actual handler to be associated with an event(s). It might be either a function or a process id, in which case the message of a following shape will be sent to it.
{:antenna_event, channel, event}
@type id() :: module()
The identifier of the isolated Antenna
@type matcher() :: term()
The matcher to be used for the event, e.g. {:tag_1, a, _} when is_nil(a)
The pattern matching is done on the Antenna
process, so it might be
either a function or a process id.
The options to be passed to Antenna.match/4
and Antenna.unmatch/2