ex_nsq v1.1.0 NSQ.Producer

A producer is a process that connects to one or many NSQDs and publishes messages.

Interface

To initialize:

{:ok, producer} = NSQ.Producer.Supervisor.start_link("the-default-topic", %NSQ.Config{
  nsqds: ["127.0.0.1:6750"]
})

The default topic argument is required, even if you plan on explicitly publishing to a different topic. If you don’t plan on using, you can set it to something like _default_topic_.

If you provide more than one nsqd, each pub/mpub will choose one randomly.

Note that, unlike consumers, producers cannot be configured to use discovery with nsqlookupd. This is because discovery requires a topic and channel, and an nsqd will only appear in nsqlookupd if it has already published messages on that topic. So there’s a chicken-and-egg problem. The recommended solution is to run NSQD on the same box where you’re publishing, so your address is always 127.0.0.1 with a static port.

pub

Publish a single message to NSQD.

NSQ.Producer.pub(producer, "a message")
NSQ.Producer.pub(producer, "different-topic", "a message")

mpub

Publish a bunch of messages to NSQD atomically.

NSQ.Producer.mpub(producer, ["one", "two"])
NSQ.Producer.mpub(producer, "different-topic", ["one", "two"])

Link to this section Summary

Types

A tuple with a string ID (used to target the connection in NSQ.Connection.Supervisor) and a PID of the connection

A tuple with a host and a port

A map, but we can be more specific by asserting some entries that should be set for a connection’s state map

Functions

Create supervised connections to NSQD

The end-user will be targeting the supervisor, but it’s the producer that can actually handle the command

Get the current state of a producer. Used in tests. Not for external use

Publish data to whatever topic is the default

Publish data to a specific topic

Publish data to whatever topic is the default

Publish data to a specific topic

Link to this section Types

Link to this type connection()
connection() :: {String.t, pid}

A tuple with a string ID (used to target the connection in NSQ.Connection.Supervisor) and a PID of the connection.

Link to this type host_with_port()
host_with_port() :: {String.t, integer}

A tuple with a host and a port.

Link to this type pro_state()
pro_state() :: %{conn_sup_pid: pid, config: NSQ.Config.t}

A map, but we can be more specific by asserting some entries that should be set for a connection’s state map.

Link to this section Functions

Link to this function connect_to_nsqds(nsqds, pro, pro_state)
connect_to_nsqds([host_with_port], pid, pro_state) :: {:ok, pro_state}

Create supervised connections to NSQD.

Link to this function get(sup_pid)
get(pid) :: pid

The end-user will be targeting the supervisor, but it’s the producer that can actually handle the command.

Link to this function get_connections(pro_state)
get_connections(pro_state) :: [connection]
Link to this function get_connections(pro, pro_state \\ nil)
get_connections(pid, pro_state) :: [connection]
Link to this function get_state(producer)
get_state(pid) :: pro_state

Get the current state of a producer. Used in tests. Not for external use.

Link to this function mpub(sup_pid, data)
mpub(pid, binary) :: {:ok, binary}

Publish data to whatever topic is the default.

Link to this function mpub(sup_pid, topic, data)
mpub(pid, binary, binary) :: {:ok, binary}

Publish data to a specific topic.

Link to this function pub(sup_pid, data)
pub(pid, binary) :: {:ok, binary}

Publish data to whatever topic is the default.

Link to this function pub(sup_pid, topic, data)
pub(pid, binary, binary) :: {:ok, binary}

Publish data to a specific topic.

Link to this function random_connection_pid(pro_state)
random_connection_pid(pro_state) :: pid
Link to this function start_link(topic, config)
start_link(binary, NSQ.Config.t) :: {:ok, pid}