Antenna (antenna v0.5.1)

View Source

Antenna is a mixture of Phoenix.PubSub and :gen_event functionality with some batteries included.

It implements back-pressure on top of GenStage, is fully conformant with OTP Design Principles. and is distributed out of the box.

Antenna supports both asynchronous and synchronous events. While the most preferrable way would be to stay fully async with Antenna.event/3, one still might propagate the event synchronously with Antenna.sync_event/3 and collect all the responses from all the handlers.

One can have as many isolated Antennas as necessary, distinguished by Antenna.t:id/0.

Getting Started

To start using Antenna, add it to your dependencies in mix.exs:

def deps do
  [
    {:antenna, "~> 0.3.0"}
  ]
end

Then add it to your supervision tree:

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      Antenna
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Configuration

Antenna can be configured in your config.exs:

config :antenna,
  id: MyApp.Antenna,           # Optional custom identifier
  distributed: true,           # Enable distributed mode (default: true)
  sync_timeout: 5_000          # Default timeout for sync_event/4

Sequence Diagram

sequenceDiagram
  Consumer->>+Broadcaster: sync_event(channel, event)
  Consumer->>+Broadcaster: event(channel, event) 
  Broadcaster-->>+Consumer@Node1: event
  Broadcaster-->>+Consumer@Node2: event
  Broadcaster-->>+Consumer@NodeN: event
  Consumer@Node1-->>-NoOp: mine?
  Consumer@NodeN-->>-NoOp: mine?
  Consumer@Node2-->>+Matchers: event
  Matchers-->>+Handlers: handle_match/2
  Matchers-->>+Handlers: handle_match/2
  Handlers->>-Consumer: response(to: sync_event)

ASCII representation.

Common Use Cases

Basic Event Handling

# Define a matcher for specific events
Antenna.match(MyApp.Antenna, {:user_registered, user_id, _meta}, fn channel, event ->
  # Handle the event
  {:user_registered, user_id, meta} = event
  Logger.info("New user registered: #{user_id} on channel #{channel}")
  MyApp.UserNotifier.welcome(user_id)
end, channels: [:user_events])

# Send events
Antenna.event(MyApp.Antenna, [:user_events], {:user_registered, "user123", %{source: "web"}})

Error Handling

# Match on error events
Antenna.match(MyApp.Antenna, {:error, reason, _context} = event, fn channel, event ->
  # Log and handle errors
  Logger.error("Error on channel #{channel}: #{inspect(event)}")
  MyApp.ErrorTracker.track(event)
end, channels: [:errors])

# Send error events
Antenna.event(MyApp.Antenna, [:errors], {:error, :validation_failed, %{user_id: 123}})

Distributed Setup

Antenna automatically handles distribution when nodes are connected:

# On node1@host1
Node.connect(:"node2@host2")
Antenna.match(MyApp.Antenna, {:replicate, data}, fn _, {:replicate, data} ->
  MyApp.Storage.replicate(data)
end, channels: [:replication])

# On node2@host2
Antenna.event(MyApp.Antenna, [:replication], {:replicate, some_data})
# The event will be processed on both nodes

Custom Matchers

You can use pattern matching and guards in your matchers:

# Match on specific patterns with guards
Antenna.match(MyApp.Antenna, 
  {:temperature, celsius, _} when celsius > 30,
  fn _, {:temperature, c, location} ->
    Logger.warning("High temperature (#{c}°C) detected at #{location}")
  end,
  channels: [:sensors])

# Match on map patterns
Antenna.match(MyApp.Antenna,
  %{event: :payment, amount: amount} when amount > 1000,
  fn _, event ->
    MyApp.LargePaymentProcessor.handle(event)
  end,
  channels: [:payments])

Usage Example

The consumer of this library is supposed to declare one or more matchers, subscribing to one or more channels, and then call Antenna.event/2 to propagate the event.

assert {:ok, _pid, "{:tag_1, a, _} when is_nil(a)"} =
  Antenna.match(Antenna, {:tag_1, a, _} when is_nil(a), self(), channels: [:chan_1])

assert :ok = Antenna.event(Antenna, [:chan_1], {:tag_1, nil, 42})
assert_receive {:antenna_event, :chan_1, {:tag_1, nil, 42}}

Architecture

Antenna uses a distributed pub/sub architecture with the following components:

  1. Broadcaster - Handles event distribution across nodes
  2. Matchers - Pattern match on events and delegate to handlers
  3. Guard - Manages channel subscriptions and handler registration
  4. PubSub - Implements the core pub/sub functionality

Events flow through the system as follows:

  1. Client calls event/3 or sync_event/4
  2. Broadcaster distributes the event to all nodes
  3. Matchers on each node pattern match against the event
  4. Matching handlers process the event
  5. For sync_event, responses are collected and returned

Best Practices

  1. Channel Organization

    • Use atoms for channel names
    • Group related events under common channels
    • Consider using hierarchical channel names (e.g., :users_created, :users_updated)
  2. Pattern Matching

    • Use specific patterns to avoid unnecessary pattern matches
    • Include guards for more precise matching
    • Consider the order of pattern matches when using multiple matchers
  3. Handler Design

    • Keep handlers focused and single-purpose
    • Use sync_event/4 when you need responses
    • Consider timeouts for sync operations
    • Handle errors within handlers to prevent cascade failures
  4. Performance

    • Use async events (event/3) when possible
    • Keep handler processing time minimal
    • Consider using separate processes for long-running operations
    • Monitor matcher and handler counts
  5. Testing

    • Test matchers with various event patterns
    • Verify handler behavior
    • Test distributed scenarios
    • Use ExUnit's async: true when possible

Summary

Functions (Client)

Sends an event to all the associated matchers through channels.

Sends an event to all the associated matchers through channels synchronously, collecting and returning responses from all handlers.

Functions (Setup)

Returns a specification to start this module under a supervisor.

Adds a handler to the matcher process specified by pid

Declares a matcher for tagged events.

Starts the Antenna matcher tree for the id given.

Subscribes a matcher process specified by pid to a channel(s)

Removes a handler from the matcher process specified by pid

Undeclares a matcher for tagged events previously declared with Antenna.match/4.

Unsubscribes a previously subscribed matcher process specified by pid from the channel(s)

Functions (Internals)

Returns a map of matches to matchers

Types

The identifier of the channel, messages can be sent to, preferrably atom()

The event being sent to the listeners

The actual handler to be associated with an event(s). It might be either a function or a process id, in which case the message of a following shape will be sent to it.

The identifier of the isolated Antenna

The matcher to be used for the event, e.g. {:tag_1, a, _} when is_nil(a)

The options to be passed to Antenna.match/4 and Antenna.unmatch/2

Functions (Client)

event(id \\ Antenna, channels, event)

@spec event(id :: id(), channels :: channel() | [channel()], event :: event()) :: :ok

Sends an event to all the associated matchers through channels.

The special :* might be specified as channels, then the event will be sent to all the registered channels.

If one wants to collect results of all the registered event handlers, they should look at sync_event/3 instead.

Examples

Basic Usage

# Send event to a single channel
Antenna.event(MyApp.Antenna, :user_events, {:user_logged_in, "user123"})

# Send event to multiple channels
Antenna.event(MyApp.Antenna, [:logs, :metrics], {:api_call, "/users", 200, 45})

# Send event to all channels
Antenna.event(MyApp.Antenna, :*, {:system_notification, :service_starting})

Custom Antenna ID

# Using a specific Antenna instance
Antenna.event(MyBackgroundJobs.Antenna, :jobs, {:job_completed, "job-123"})

sync_event(id \\ Antenna, channels, event, timeout \\ 5000, extra_timeout \\ 100)

@spec sync_event(
  id :: id(),
  channels :: channel() | [channel()],
  event :: event(),
  timeout :: timeout(),
  extra_timeout :: timeout()
) :: [term()]

Sends an event to all the associated matchers through channels synchronously, collecting and returning responses from all handlers.

The special :* might be specified as channels, then the event will be sent to all the registered channels.

Unlike event/3, this function waits for all handlers to process the event and returns their responses.

Parameters

  • id - The Antenna instance identifier (optional, defaults to configured value)
  • channels - Single channel or list of channels to send the event to
  • event - The event to be sent
  • timeout - Maximum time to wait for responses (default: 5000ms)

Examples

Basic Usage

# Send sync event and collect responses
results = Antenna.sync_event(MyApp.Antenna, :user_events, {:verify_user, "user123"})

# Send to multiple channels with custom timeout
results = Antenna.sync_event(
  MyApp.Antenna,
  [:validations, :security],
  {:user_action, "user123", :delete_account},
  10_000
)

Response Format

The function returns a list of handler responses, each wrapped in a tuple indicating whether the pattern matched and including the handler's result:

[
  {:match, %{
    match: "pattern_string",
    channel: :channel_name,
    results: [handler_result]
  }},
  {:no_match, :channel_name}
]

Functions (Setup)

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

handle(id \\ Antenna, handlers, pid)

@spec handle(id :: id(), handlers :: handler() | [handler()] | MapSet.t(), pid()) ::
  :ok

Adds a handler to the matcher process specified by pid

match(id \\ Antenna, match, handlers, opts \\ [])

(macro)

Declares a matcher for tagged events.

This function sets up a pattern matcher to handle specific events on the given channels. When an event matching the pattern is sent to any of the channels, the provided handler will be invoked.

Options

  • :channels - A list of channels to subscribe this matcher to
  • :once? - If true, the matcher will be automatically unmatched after the first match (default: false)

Examples

Basic Usage

Antenna.match(Antenna, %{tag: _, success: false}, fn channel, message ->
  Logger.warning("The processing failed for [" <> 
    inspect(channel) <> "], result: " <> inspect(message))
end, channels: [:rabbit])

One-time Matcher

# This matcher will only trigger once and then be removed
Antenna.match(Antenna, {:init, pid}, fn _, {:init, pid} ->
  send(pid, :initialized)
end, channels: [:system], once?: true)

Using Pattern Guards

Antenna.match(Antenna, 
  {:metric, name, value} when value > 100, 
  fn channel, event ->
    Logger.info("High metric value on #{channel}: #{inspect(event)}")
  end, 
  channels: [:metrics])

Multiple Channels

Antenna.match(Antenna, 
  {:user_event, user_id, _action},
  fn channel, event ->
    # Handle user events from multiple channels
    MyApp.UserEventTracker.track(channel, event)
  end,
  channels: [:user_logins, :user_actions, :user_settings])

start_link(init_arg \\ [])

@spec start_link([{:id, atom()}]) :: Supervisor.on_start()

Starts the Antenna matcher tree for the id given.

subscribe(id \\ Antenna, channels, pid)

@spec subscribe(id :: id(), channels :: channel() | [channel()] | MapSet.t(), pid()) ::
  :ok

Subscribes a matcher process specified by pid to a channel(s)

unhandle(id \\ Antenna, handlers, pid)

@spec unhandle(id :: id(), handlers :: handler() | [handler()] | MapSet.t(), pid()) ::
  :ok

Removes a handler from the matcher process specified by pid

unmatch(id \\ Antenna, match)

(macro)

Undeclares a matcher for tagged events previously declared with Antenna.match/4.

Accepts both an original match or a name returned by Antenna.match/4, which is effectively Macro.to_string(match).

Example

Antenna.unmatch(Antenna, %{tag: _, success: false})

unsubscribe(id \\ Antenna, channels, pid)

@spec unsubscribe(id :: id(), channels :: channel() | [channel()] | MapSet.t(), pid()) ::
  :ok

Unsubscribes a previously subscribed matcher process specified by pid from the channel(s)

Functions (Internals)

registered_matchers(id)

@spec registered_matchers(id :: id()) :: %{
  required(term()) => {pid(), Supervisor.child_spec()}
}

Returns a map of matches to matchers

Types

channel()

@type channel() :: atom() | term()

The identifier of the channel, messages can be sent to, preferrably atom()

event()

@type event() :: term()

The event being sent to the listeners

handler()

@type handler() ::
  (event() -> term())
  | (channel(), event() -> term())
  | Antenna.Matcher.t()
  | pid()
  | GenServer.name()

The actual handler to be associated with an event(s). It might be either a function or a process id, in which case the message of a following shape will be sent to it.

{:antenna_event, channel, event}

id()

@type id() :: module()

The identifier of the isolated Antenna

matcher()

@type matcher() :: term()

The matcher to be used for the event, e.g. {:tag_1, a, _} when is_nil(a)

The pattern matching is done on the Antenna process, so it might be either a function or a process id.

opts()

@type opts() :: [id: id(), channels: channel() | [channel()], once?: boolean()]

The options to be passed to Antenna.match/4 and Antenna.unmatch/2

  • :id - the identifier of the Antenna, defaults to Antenna
  • :channels - a list of channels to subscribe the matcher to
  • :once? - if true, the matcher would be removed after the first match