Elixir Client for Apache Pulsar

Copy Markdown View Source

CI Coverage Status Package Version hexdocs.pm

[!TIP] Using Broadway? Check out the companion project: off_broadway_pulsar.

An Elixir client for Apache Pulsar.

Installation

Add :pulsar_elixir to your dependencies in mix.exs:

def deps do
  [
    {:pulsar, "~> 2.8.13", hex: :pulsar_elixir}
  ]
end

Quick Start

Assuming you have Pulsar running on localhost:6650, the quickest way to consume messages from a Pulsar topic is using the Reader interace as shown below

"persistent://my-tenant/my-namespace/my-topic"
|> Pulsar.Reader.stream(host: "pulsar://localhost:6650", timeout: 100)
|> Enum.map(fn msg -> String.to_integer(msg.payload) end)
|> Enum.filter(fn n -> rem(n, 2) == 0 end)
|> Enum.map(fn n -> n * 2 end)

For more complex scenarios and assuming that you have implemented a basic consumer like the one below:

defmodule MyPulsarConsumer do
  use Pulsar.Consumer.Callback

  def handle_message(message, state) do
    IO.puts("Received: #{message.payload}")
    {:ok, state}
  end
end

You can start producing and consuming messages with the following configuration:

config :pulsar,
  host: "pulsar://localhost:6650",
  consumers: [
    my_consumer: [
        topic: "persistent://my-tenant/my-namespace/my-topic",
        subscription_name: "my-subscription",
        callback_module: MyPulsarConsumer
    ]
  ],
  producers: [
    my_producer: [
        topic: "persistent://my-tenant/my-namespace/my-topic"
    ]
  ]

Sending a message using the configured producer can be done as follows:

Pulsar.send(:my_producer, "Hello, Pulsar!")

By default, brokers, consumers and producers are started within the scope of the :default client, but you can also configure multiple clients (which may come in handy if you need to connect to multiple clusters).

clients: [
  client_1: [
      host: "pulsar://host.cluster1.com:6650"
  ],
  client_2: [
      host: "pulsar://host.cluster2.com:6650"
  ]
]

Then, you can specify the client in the consumer or producer configuration using the client key, eg. client: :client_1.

producers: [
  my_producer_1: [
    client: :client_1
    topic: "persistent://my-tenant/my-namespace/my-topic"
  ]
]

If your Pulsar cluster requires authentication, you can configure it in the client using the auth key:

auth: [
  type: Pulsar.Auth.OAuth2,
  opts: [
    client_id: "<YOUR-OAUTH2-CLIENT-ID>",
    client_secret: "<YOUR-OAUTH2-CLIENT-SECRET>",
    site: "<YOUR-OAUTH2-ISSUER-URL>",
    audience: "<YOUR-OAUTH2-AUDIENCE>"
  ]
]

Testing

[!IMPORTANT] Do not forget to add the following line to your /etc/hosts file before running the tests:

127.0.0.1 broker1 broker2

To run the tests, run the following command:

mix test

If you want to run only a subset of tests, specify the file including the tests you want to run

mix test test/integration/consumer_test.exs

You can also run individual tests by passing the line number where they are defined

mix test test/integration/consumer_test.exs:43

The examples directory includes a number of examples that demonstrate the use of the Pulsar client. For example:

mix run --no-start examples/bingo.exs

Features

The full feature matrix for Apache Pulsar can be found here.

ComponentFeatureSupported
ClientTLS encryption
ClientAuthentication⚠️
ClientTransaction
ClientStatistics
ProducerSync send
ProducerAsync send
ProducerBatching
ProducerChunking
ProducerCompression
ProducerSchema
ProducerPartitioned topics
ProducerAccess modes
ConsumerACK
ConsumerBatch-index ACK
ConsumerNACK
ConsumerNACK back-off
ConsumerBatching
ConsumerPartitioned topics
ConsumerChunking
ConsumerSeek
ConsumerSubscription types
ConsumerSubscription modes
ConsumerRetry letter topic
ConsumerDead letter topic
ConsumerCompression
ConsumerCompaction
ConsumerSchema
ConsumerConfigurable flow control settings
Reader
TableView