Peluquero (peluquero v0.99.43) View Source
Application handler for the Peluquero
application.
Peluquero
is the multi-purpose, multi-protocol message handling utility.
It provides the transparent way to subscribe to rabbit queues, re-shape the
messages and spit out everything to other queues. It also supports transparent
redis connection for those who need to store the data in redis.
Basically, one would configure Peluquero
through [consul
](https://consul.io/)
(explicit configuration is also allowed, though I would strongly encourage users
to use consul
for the configuration,) and declare so-called scissors
to
mutate the messages from sources before putting them into destinations.
Multiple sources as well as multiple destinations are allowed. Example of the configuration that reads two different message queues, transforms data and spits it out to the “common” queue, that might be used by subscribers (join different sources):
config :peluquero, :peluquerias, [
football_scores: [
scissors: [{MyModule, :my_function}],
consul: "configuration/football",
pool: [actors: [size: 50, max_overflow: 100]]]]
The configuration above would read the settings for rabbits from consul (under
football
key,) subscribe to the rabbit using the pool of 50
workers and
call MyModule.my_function/1
on each subsequent message received from sources.
As MyModule.my_function/1
returns, the value it returned will be placed into
all the destination queues, specified in consul
(if it returns nil
, the
original value will be used.) The tree structure of consul
settings would be as follows:
configuration/football/
destinations/
exchangeY/
routing_key ⇒ transformed
exchangeZ/
rabbit/
host ⇒ localhost
password ⇒ my_rabbit_password
port ⇒ 5672
user ⇒ my_rabbit_user
virtual_host ⇒ my_virtual_host
x_message_ttl ⇒ 4000
sources/
exchangeA/
prefetch_count ⇒ 30
routing_key ⇒ to_transform
exchangeB/
prefetch_count ⇒ 50
queue ⇒ queue_name
routing_key ⇒ to_transform
redis/
host ⇒ localhost
port ⇒ 11887
database ⇒ 0
pwd ⇒ my_redis_password
my_module_1.ex
Peluquero.Peluqueria.scissors!(:p1, &IO.puts/1) # adds another handler in runtime
Peluquero.Peluqueria.scissors!(:p2, fn payload ->
payload
|> Jason.decode!
|> Map.put(:timestamp, DateTime.utc_now())
|> Jason.encode! # if this transformer is last, it’s safe to return a term
end) # adds another handler in runtime, to :p2 named instance
The result of the above would be:
- direct exchanges
exchangeA
andexchangeB
would be consumed withrouting_key
beingto_transform
; - all the messages will be put to
stdout
twice (one withIO.inspect
, configured inconfig.exs
and another withIO.puts
, attached in runtime); - all the messages will be extended with new
:timestamp
field; - all the messages will be published to direct
exchangeY
withrouting_key
being set totransformed
and to fanout exchangeexchangeZ
.
Handlers might be added in runtime using Peluquero.handler!/1
, that accepts
any type of transformers described above. Handlers are appended to the list.
Maybe later this function would accept an optional parameter, saying whether
the handler should be appended, or prepended.
Also, Peluquero
simplifies the Redis
access: all the connection boilerplate
is handler by Peluquero
, allowing storing a Redis
values based on streaming
queue (e.g. “current value”) as easy as declaring one scissors
delegating to
Peluquero.Peinados.set/3
.