galena v0.1.2 Galena.ProducerConsumer behaviour

Galena.ProducerConsumer is a customized GenStage consumer-producer which is able to receive some messages from some producers or producers-consumers and send them to the consumers or producer-consumers that are subscribed. The producer-consumer will have the possibility to be subscribed to the chosen topics from the chosen producers.

Definition

defmodule MyProducerConsumer do
  use Galena.ProducerConsumer

  def handle_produce(topic, data) do
    result_topic = topic <> Integer.to_string(:rand.uniform(2))
    {result_topic, "modified by producer-consumer: " <> data}
  end
end

Start up

Define the args of your ProducerConsumer. It has to be a Keyword list which has to contain a producers_info field which will have a list of tuples of two parameters, where the first one will be a list of topics and the second one the producer or producer-consumer:

  args = [
    producers_info: [
      {["topic_1", "topic_2", "topic_3"], :producer1},
      {["topic_A"], :producer2},
      {["topic_a", "topic_b"], :producer3},
      {[], :producer4}
    ]
  ]

When the list of topics is empty, your producer-consumer will receive all the information published by the producer.

{:ok, producer_consumer} = MyProducerConsumer.start_link(args, [name: :prod_cons])

Summary

Callbacks

It will be executed just before a message is sent to the consumers (or producer-consumers)

Types

produced_message()
produced_message() :: any
produced_topic()
produced_topic() :: any
received_message()
received_message() :: any
subscribed_topic()
subscribed_topic() :: any

Callbacks

handle_produce(subscribed_topic, received_message)

It will be executed just before a message is sent to the consumers (or producer-consumers).

The inputs of the function are a topic (subscribed topic) and a message (received message). The output of that function has to be a tuple where the first parameter will be the topic (produced topic) and the second one the message (produced message).