View Source BroadwayRabbitMQ.ChannelPool behaviour (BroadwayRabbitMQ v0.8.1)

A behaviour used to implement custom AMQP channel pools.

By default, BroadwayRabbitMQ handles its own pool of AMQP connections and channels. However, there might be use cases where you want to use your own existing pool or custom pooling logic. In those cases, you can implement a custom channel pool using this behaviour.

To use a custom pool, pass {:custom_pool, module, args} as the value of the :connection option in BroadwayRabbitMQ.Producer. module needs to implement this behaviour, and args is passed down to checkout_channel/1 and checkin_channel/2.

Examples

Imagine we pass this option when starting the producer:

connection: {:custom_pool, MyPool, _amqp_connection = :big_pool}

Then, we could define the custom pool as:

defmodule MyPool do
  @behaviour BroadwayRabbitMQ.ChannelPool

  @impl true
  def checkout_channel(name) do
    conn = %AMQP.Connection{pid: Process.whereis(name)}

    case AMQP.Channel.open(conn) do
      {:ok, channel} -> {:ok, channel}
      {:error, reason} -> {:error, %RuntimeError{message: inspect(reason)}}
    end
  end

  @impl true
  def checkin_channel(_name, channel) do
    case AMQP.Channel.close(channel) do
      :ok -> :ok
      {:error, reason} -> {:error, %RuntimeError{message: inspect(reason)}}
    end
  end
end

Summary

Callbacks

Invoked to check a channel back into the pool.

Invoked to check out a AMQP channel from the pool.

Callbacks

Link to this callback

checkin_channel(args, channel)

View Source (since 0.8.0)
@callback checkin_channel(args :: term(), channel :: AMQP.Channel.t()) ::
  :ok | {:error, reason :: Exception.t()}

Invoked to check a channel back into the pool.

channel is a channel that was returned by checkout_channel/1.

If there is an error, you can return any exception as {:error, exception}.

In case your pool fails to handle this properly, BroadwayRabbitMQ will try to close the channel by itself using AMQP.Channel.close/1.

Link to this callback

checkout_channel(args)

View Source (since 0.8.0)
@callback checkout_channel(args :: term()) ::
  {:ok, AMQP.Channel.t()} | {:error, reason :: Exception.t()}

Invoked to check out a AMQP channel from the pool.

If there is an error, you can return any exception as {:error, exception}.

This callback is invoked from a BroadwayRabbitMQ.Producer process. If you need the PID of that process, call self() in your implementation. This can be useful for things like linking or monitoring.