Conduit v0.12.10 Conduit.Message View Source

The Conduit message.

This module defines a Conduit.Message struct and the main functions for working with Conduit messages.

Note this struct is used for sending and receiving messages from a message queue.

Public fields

These fields are for you to use in your application. The values in user_id, correlation_id, message_id, content_type, content_encoding, created_by, created_at, headers, and status may have special meaning based on the adapter you use. See your adapters documention to understand how to use them correctly.

  • source - For incoming messages, this will be set to the queue the message was consumed from.
  • destination - For outgoing messages, this will be set to the destination queue (or routing key) it is published to.
  • user_id - An ID representing which user the message pertains to.
  • correlation_id - An ID for a chain of messages, where the current message is one in that chain.
  • message_id - A unique ID for this message.
  • content_type - The media type of the message body.
  • content_encoding - The encoding of the message body.
  • created_by - The name of the app that created the message.
  • created_at - A timestamp or epoch representing when the message was created.
  • headers - Information applicable to a specific message stored as a keyword list.
  • body - The contents of the message.
  • status - The operation to perform on the message. This only applies to messages that are being received.

Private fields

These fields are reserved for library/framework usage.

  • private - shared library data as a map

Link to this section Summary

Functions

Assigs the status of the message as acknowledged. This will be used to signal to the message queue that processing the message was successful and can be discarded

Assigns a named value to the message

Retrieves a named value from the message

Deletes a header from the message specified by key

Returns all non-nil fields from the message as a map

Returns a header from the message specified by key

Retrieves a named value from the message. This is intended for libraries and framework use

Merges headers to one message from another

Assigs the status of the message to a negative acknowledged. This will be used to signal to the message queue that processing the message was not successful

Assigns the content of the message

Assigns a content_encoding to the message

Assigns a content_type to the message

Assigns a correlation_id to the message

Assigns a created_at to the message

Assigns a created_by to the message

Assigns the destination of the message

Assigns a header for the message specified by key

Assigns a header for the message specified by key

Assigns a message_id to the message

Assigns a correlation_id to the message when one isn’t set already

Assigns a destination to the message when one isn’t set already

Assigns a message_id to the message when one isn’t set already

Assigns a source to the message when one isn’t set already

Assigns a named value to the message. This is intended for libraries and framework use

Assigns the source of the message

Assigns a user_id to the message

Creates a new message with the fields and headers specified

Link to this section Types

Link to this type assigns() View Source
assigns() :: %{optional(atom()) => any()}
Link to this type content_encoding() View Source
content_encoding() :: String.t() | (... -> any()) | nil
Link to this type content_type() View Source
content_type() :: String.t() | (... -> any()) | nil
Link to this type correlation_id() View Source
correlation_id() :: binary() | integer() | (... -> any()) | nil
Link to this type created_at() View Source
created_at() :: String.t() | integer() | (... -> any()) | nil
Link to this type created_by() View Source
created_by() :: binary() | (... -> any()) | nil
Link to this type destination() View Source
destination() :: binary() | (... -> any()) | nil
Link to this type headers() View Source
headers() :: %{optional(String.t()) => any()}
Link to this type message_id() View Source
message_id() :: binary() | integer() | (... -> any()) | nil
Link to this type private() View Source
private() :: %{optional(atom()) => any()}
Link to this type source() View Source
source() :: binary() | (... -> any()) | nil
Link to this type status() View Source
status() :: :ack | :nack
Link to this type t() View Source
t() :: %Conduit.Message{
  assigns: assigns(),
  body: body(),
  content_encoding: content_encoding(),
  content_type: content_type(),
  correlation_id: correlation_id(),
  created_at: created_at(),
  created_by: created_by(),
  destination: destination(),
  headers: headers(),
  message_id: message_id(),
  private: private(),
  source: source(),
  status: status(),
  user_id: user_id()
}
Link to this type user_id() View Source
user_id() :: binary() | integer() | (... -> any()) | nil

Link to this section Functions

Assigs the status of the message as acknowledged. This will be used to signal to the message queue that processing the message was successful and can be discarded.

Examples

iex> import Conduit.Message
iex> message = ack(%Conduit.Message{})
iex> message.status
:ack
Link to this function assign(message, key, value) View Source

Assigns a named value to the message.

Examples

iex> import Conduit.Message
iex> message = assign(%Conduit.Message{}, :user_id, 1)
iex> assigns(message, :user_id)
1

Retrieves a named value from the message.

Examples

iex> import Conduit.Message
iex> message = assign(%Conduit.Message{}, :user_id, 1)
iex> assigns(message, :user_id)
1
Link to this function delete_header(message, key) View Source
delete_header(Conduit.Message.t(), String.t()) :: Conduit.Message.t()

Deletes a header from the message specified by key.

Examples

iex> import Conduit.Message
iex> message = put_header(%Conduit.Message{}, "retries", 1)
iex> message = delete_header(message, "retries")
iex> get_header(message, "retries")
nil
Link to this function get_fields(message) View Source
get_fields(Conduit.Message.t()) :: %{optional(atom()) => term()}

Returns all non-nil fields from the message as a map.

The following fields will be returned:

  • :source
  • :destination
  • :user_id
  • :correlation_id
  • :message_id
  • :content_type
  • :content_encoding
  • :created_by
  • :created_at

Examples

iex> import Conduit.Message
iex> message =
iex>   %Conduit.Message{}
iex>   |> put_message_id("1")
iex>   |> put_correlation_id("2")
iex> get_fields(message)
%{
  message_id: "1",
  correlation_id: "2"
}
Link to this function get_header(message, key) View Source
get_header(Conduit.Message.t(), String.t()) :: any()

Returns a header from the message specified by key.

Examples

iex> import Conduit.Message
iex> message = put_header(%Conduit.Message{}, "retries", 1)
iex> get_header(message, "retries")
1
Link to this function get_private(message, key) View Source
get_private(Conduit.Message.t(), atom()) :: term()

Retrieves a named value from the message. This is intended for libraries and framework use.

Examples

iex> import Conduit.Message
iex> message = put_private(%Conduit.Message{}, :message_id, 1)
iex> get_private(message, :message_id)
1
Link to this function merge_fields(to, from, fields \\ [:source, :destination, :user_id, :correlation_id, :message_id, :content_type, :content_encoding, :created_by, :created_at, :status]) View Source
merge_fields(
  to :: Conduit.Message.t(),
  from :: Conduit.Message.t(),
  fields :: [atom()]
) :: Conduit.Message.t()

Merges fields to one message from another.

Examples

iex> import Conduit.Message
iex> old_message = put_correlation_id(%Conduit.Message{}, "123")
iex> new_message = Conduit.Message.merge_fields(%Conduit.Message{}, old_message)
iex> new_message.correlation_id
"123"
iex> new_message = Conduit.Message.merge_fields(%Conduit.Message{}, old_message, [:correlation_id])
iex> new_message.correlation_id
"123"
Link to this function merge_headers(to, from, headers) View Source
merge_headers(
  to :: Conduit.Message.t(),
  from :: Conduit.Message.t(),
  headers :: [String.t()]
) :: Conduit.Message.t()

Merges headers to one message from another.

Examples

iex> import Conduit.Message
iex> old_message = put_header(%Conduit.Message{}, "retries", 1)
iex> new_message = Conduit.Message.merge_headers(%Conduit.Message{}, old_message, ["retries"])
iex> get_header(new_message, "retries")
1

Assigs the status of the message to a negative acknowledged. This will be used to signal to the message queue that processing the message was not successful.

Examples

iex> import Conduit.Message
iex> message = nack(%Conduit.Message{})
iex> message.status
:nack

Assigns the content of the message.

Examples

iex> import Conduit.Message
iex> message = put_body(%Conduit.Message{}, "hi")
iex> message.body
"hi"
iex> message = put_body(message, fn _mess -> "bye" end)
iex> message.body
"bye"
Link to this function put_content_encoding(message, content_encoding) View Source
put_content_encoding(Conduit.Message.t(), content_encoding()) ::
  Conduit.Message.t()

Assigns a content_encoding to the message.

Examples

iex> import Conduit.Message
iex> message = put_content_encoding(%Conduit.Message{}, "gzip")
iex> message.content_encoding
"gzip"
iex> message = put_content_encoding(%Conduit.Message{}, fn _mess -> "gzip" end)
iex> message.content_encoding
"gzip"
Link to this function put_content_type(message, content_type) View Source
put_content_type(Conduit.Message.t(), content_type()) :: Conduit.Message.t()

Assigns a content_type to the message.

Examples

iex> import Conduit.Message
iex> message = put_content_type(%Conduit.Message{}, "application/json")
iex> message.content_type
"application/json"
iex> message = put_content_type(%Conduit.Message{}, fn _mess -> "application/json" end)
iex> message.content_type
"application/json"
Link to this function put_correlation_id(message, correlation_id) View Source
put_correlation_id(Conduit.Message.t(), correlation_id()) :: Conduit.Message.t()

Assigns a correlation_id to the message.

Examples

iex> import Conduit.Message
iex> message = put_correlation_id(%Conduit.Message{}, 1)
iex> message.correlation_id
1
iex> message = put_correlation_id(message, fn _mess -> 2 end)
iex> message.correlation_id
2
Link to this function put_created_at(message, created_at) View Source
put_created_at(Conduit.Message.t(), created_at()) :: Conduit.Message.t()

Assigns a created_at to the message.

Examples

iex> import Conduit.Message
iex> message = put_created_at(%Conduit.Message{}, 1)
iex> message.created_at
1
iex> message = put_created_at(%Conduit.Message{}, fn _mess -> 1 end)
iex> message.created_at
1
Link to this function put_created_by(message, created_by) View Source
put_created_by(Conduit.Message.t(), created_by()) :: Conduit.Message.t()

Assigns a created_by to the message.

Examples

iex> import Conduit.Message
iex> message = put_created_by(%Conduit.Message{}, "my_app")
iex> message.created_by
"my_app"
iex> message = put_created_by(%Conduit.Message{}, fn _mess ->"my_app" end)
iex> message.created_by
"my_app"
Link to this function put_destination(message, destination) View Source
put_destination(Conduit.Message.t(), destination()) :: Conduit.Message.t()

Assigns the destination of the message.

Examples

iex> import Conduit.Message
iex> message =
iex>   %Conduit.Message{}
iex>   |> put_source("over.there")
iex>   |> put_destination("my.queue")
iex> message.destination
"my.queue"
iex> message = put_destination(message, fn mess -> mess.source <> ".error" end)
iex> message.destination
"over.there.error"
Link to this function put_header(message, key, value) View Source
put_header(Conduit.Message.t(), String.t(), any()) :: Conduit.Message.t()

Assigns a header for the message specified by key.

Examples

iex> import Conduit.Message
iex> message = put_header(%Conduit.Message{}, "retries", 1)
iex> get_header(message, "retries")
1
iex> message = put_header(message, "retries", fn mess -> get_header(mess, "retries") + 1 end)
iex> get_header(message, "retries")
2
Link to this function put_headers(message, headers) View Source
put_headers(Conduit.Message.t(), %{optional(String.t()) => any()}) ::
  Conduit.Message.t()

Assigns a header for the message specified by key.

Examples

iex> import Conduit.Message
iex> message = put_headers(%Conduit.Message{}, %{"retries" => 1})
iex> get_header(message, "retries")
1
iex> message = put_headers(message, %{"retries" => fn mess -> get_header(mess, "retries") + 1 end})
iex> get_header(message, "retries")
2
Link to this function put_message_id(message, message_id) View Source
put_message_id(Conduit.Message.t(), message_id()) :: Conduit.Message.t()

Assigns a message_id to the message.

Examples

iex> import Conduit.Message
iex> message = put_message_id(%Conduit.Message{}, 1)
iex> message.message_id
1
iex> message = put_message_id(%Conduit.Message{}, fn _mess -> 1 end)
iex> message.message_id
1
Link to this function put_new_correlation_id(message, correlation_id) View Source
put_new_correlation_id(Conduit.Message.t(), correlation_id()) ::
  Conduit.Message.t()

Assigns a correlation_id to the message when one isn’t set already.

Examples

iex> import Conduit.Message
iex> message = put_new_correlation_id(%Conduit.Message{}, 1)
iex> message = put_new_correlation_id(message, 2)
iex> message.correlation_id
1
iex> message = put_new_correlation_id(%Conduit.Message{}, fn _mess -> 1 end)
iex> message = put_new_correlation_id(message, fn _mess -> 2 end)
iex> message.correlation_id
1
Link to this function put_new_destination(message, destination) View Source
put_new_destination(Conduit.Message.t(), destination()) :: Conduit.Message.t()

Assigns a destination to the message when one isn’t set already.

Examples

iex> import Conduit.Message
iex> message = put_new_destination(%Conduit.Message{}, "your.queue")
iex> message = put_new_destination(message, "my.queue")
iex> message.destination
"your.queue"
iex> message = put_new_destination(%Conduit.Message{}, fn _mess -> "your.queue" end)
iex> message = put_new_destination(message, fn _mess -> "my.queue" end)
iex> message.destination
"your.queue"
Link to this function put_new_message_id(message, message_id) View Source
put_new_message_id(Conduit.Message.t(), message_id()) :: Conduit.Message.t()

Assigns a message_id to the message when one isn’t set already.

Examples

iex> import Conduit.Message
iex> message = put_new_message_id(%Conduit.Message{}, 1)
iex> message = put_new_message_id(message, 2)
iex> message.message_id
1
iex> message = put_new_message_id(%Conduit.Message{}, fn _mess -> 1 end)
iex> message = put_new_message_id(message, fn _mess -> 2 end)
iex> message.message_id
1
Link to this function put_new_source(message, source) View Source
put_new_source(Conduit.Message.t(), source()) :: Conduit.Message.t()

Assigns a source to the message when one isn’t set already.

Examples

iex> import Conduit.Message
iex> message = put_new_source(%Conduit.Message{}, "my.queue")
iex> message = put_new_source(message, "your.queue")
iex> message.source
"my.queue"
iex> message = put_new_source(%Conduit.Message{}, fn _mess -> "my.queue" end)
iex> message = put_new_source(message, fn _mess -> "your.queue" end)
iex> message.source
"my.queue"
Link to this function put_private(message, key, value) View Source
put_private(Conduit.Message.t(), atom(), any()) :: Conduit.Message.t()

Assigns a named value to the message. This is intended for libraries and framework use.

Examples

iex> import Conduit.Message
iex> message = put_private(%Conduit.Message{}, :message_id, 1)
iex> get_private(message, :message_id)
1
Link to this function put_source(message, source) View Source

Assigns the source of the message.

Examples

iex> import Conduit.Message
iex> message =
iex>   %Conduit.Message{}
iex>   |> put_source("my.queue")
iex>   |> put_header("routing_key", "my.routing_key")
iex> message.source
"my.queue"
iex> message = put_source(message, fn mess ->
iex>   get_header(mess, "routing_key")
iex> end)
iex> message.source
"my.routing_key"
Link to this function put_user_id(message, user_id) View Source

Assigns a user_id to the message.

Examples

iex> import Conduit.Message
iex> message = put_user_id(%Conduit.Message{}, 1)
iex> message.user_id
1
iex> message = put_user_id(message, fn _mess -> 2 end)
iex> message.user_id
2
Link to this function take(from, opts) View Source
take(
  from :: Conduit.Message.t(),
  opts :: [fields: [atom()], headers: [String.t()]]
) :: Conduit.Message.t()

Creates a new message with the fields and headers specified.

Examples

iex> import Conduit.Message
iex> old_message =
iex>   %Conduit.Message{}
iex>   |> put_correlation_id("123")
iex>   |> put_header("retries", 1)
iex> new_message = Conduit.Message.take(old_message,
iex>   headers: ["retries"], fields: [:correlation_id])
iex> new_message.correlation_id
"123"
iex> get_header(new_message, "retries")
1