Conduit v0.12.9 Conduit.Message View Source
The Conduit message.
This module defines a Conduit.Message
struct and the main functions
for working with Conduit messages.
Note this struct is used for sending and receiving messages from a message queue.
Public fields
These fields are for you to use in your application. The values in
user_id
, correlation_id
, message_id
, content_type
,
content_encoding
, created_by
, created_at
, headers
, and
status
may have special meaning based on the adapter you use.
See your adapters documention to understand how to use them correctly.
source
- For incoming messages, this will be set to the queue the message was consumed from.destination
- For outgoing messages, this will be set to the destination queue (or routing key) it is published to.user_id
- An ID representing which user the message pertains to.correlation_id
- An ID for a chain of messages, where the current message is one in that chain.message_id
- A unique ID for this message.content_type
- The media type of the message body.content_encoding
- The encoding of the message body.created_by
- The name of the app that created the message.created_at
- A timestamp or epoch representing when the message was created.headers
- Information applicable to a specific message stored as a keyword list.body
- The contents of the message.status
- The operation to perform on the message. This only applies to messages that are being received.
Private fields
These fields are reserved for library/framework usage.
private
- shared library data as a map
Link to this section Summary
Functions
Assigs the status of the message as acknowledged. This will be used to signal to the message queue that processing the message was successful and can be discarded
Assigns a named value to the message
Retrieves a named value from the message
Deletes a header from the message specified by key
Returns all non-nil
fields from the message as a map
Returns a header from the message specified by key
Retrieves a named value from the message. This is intended for libraries and framework use
Merges fields to one message from another
Merges headers to one message from another
Assigs the status of the message to a negative acknowledged. This will be used to signal to the message queue that processing the message was not successful
Assigns the content of the message
Assigns a content_encoding to the message
Assigns a content_type to the message
Assigns a correlation_id to the message
Assigns a created_at to the message
Assigns a created_by to the message
Assigns the destination of the message
Assigns a header for the message specified by key
Assigns a header for the message specified by key
Assigns a message_id to the message
Assigns a correlation_id to the message when one isn’t set already
Assigns a destination to the message when one isn’t set already
Assigns a message_id to the message when one isn’t set already
Assigns a source to the message when one isn’t set already
Assigns a named value to the message. This is intended for libraries and framework use
Assigns the source of the message
Assigns a user_id to the message
Creates a new message with the fields and headers specified
Link to this section Types
t() :: %Conduit.Message{ assigns: assigns(), body: body(), content_encoding: content_encoding(), content_type: content_type(), correlation_id: correlation_id(), created_at: created_at(), created_by: created_by(), destination: destination(), headers: headers(), message_id: message_id(), private: private(), source: source(), status: status(), user_id: user_id() }
Link to this section Functions
Assigs the status of the message as acknowledged. This will be used to signal to the message queue that processing the message was successful and can be discarded.
Examples
iex> import Conduit.Message
iex> message = ack(%Conduit.Message{})
iex> message.status
:ack
assign(Conduit.Message.t(), atom(), any()) :: Conduit.Message.t()
Assigns a named value to the message.
Examples
iex> import Conduit.Message
iex> message = assign(%Conduit.Message{}, :user_id, 1)
iex> assigns(message, :user_id)
1
assigns(Conduit.Message.t(), term()) :: Conduit.Message.t()
Retrieves a named value from the message.
Examples
iex> import Conduit.Message
iex> message = assign(%Conduit.Message{}, :user_id, 1)
iex> assigns(message, :user_id)
1
delete_header(Conduit.Message.t(), String.t()) :: Conduit.Message.t()
Deletes a header from the message specified by key
.
Examples
iex> import Conduit.Message
iex> message = put_header(%Conduit.Message{}, "retries", 1)
iex> message = delete_header(message, "retries")
iex> get_header(message, "retries")
nil
get_fields(Conduit.Message.t()) :: %{optional(atom()) => term()}
Returns all non-nil
fields from the message as a map.
The following fields will be returned:
:source
:destination
:user_id
:correlation_id
:message_id
:content_type
:content_encoding
:created_by
:created_at
Examples
iex> import Conduit.Message
iex> message =
iex> %Conduit.Message{}
iex> |> put_message_id("1")
iex> |> put_correlation_id("2")
iex> get_fields(message)
%{
message_id: "1",
correlation_id: "2"
}
get_header(Conduit.Message.t(), String.t()) :: any()
Returns a header from the message specified by key
.
Examples
iex> import Conduit.Message
iex> message = put_header(%Conduit.Message{}, "retries", 1)
iex> get_header(message, "retries")
1
get_private(Conduit.Message.t(), atom()) :: term()
Retrieves a named value from the message. This is intended for libraries and framework use.
Examples
iex> import Conduit.Message
iex> message = put_private(%Conduit.Message{}, :message_id, 1)
iex> get_private(message, :message_id)
1
merge_fields( to :: Conduit.Message.t(), from :: Conduit.Message.t(), fields :: [atom()] ) :: Conduit.Message.t()
Merges fields to one message from another.
Examples
iex> import Conduit.Message
iex> old_message = put_correlation_id(%Conduit.Message{}, "123")
iex> new_message = Conduit.Message.merge_fields(%Conduit.Message{}, old_message)
iex> new_message.correlation_id
"123"
iex> new_message = Conduit.Message.merge_fields(%Conduit.Message{}, old_message, [:correlation_id])
iex> new_message.correlation_id
"123"
merge_headers( to :: Conduit.Message.t(), from :: Conduit.Message.t(), headers :: [String.t()] ) :: Conduit.Message.t()
Merges headers to one message from another.
Examples
iex> import Conduit.Message
iex> old_message = put_header(%Conduit.Message{}, "retries", 1)
iex> new_message = Conduit.Message.merge_headers(%Conduit.Message{}, old_message, ["retries"])
iex> get_header(new_message, "retries")
1
Assigs the status of the message to a negative acknowledged. This will be used to signal to the message queue that processing the message was not successful.
Examples
iex> import Conduit.Message
iex> message = nack(%Conduit.Message{})
iex> message.status
:nack
put_body(Conduit.Message.t(), body()) :: Conduit.Message.t()
Assigns the content of the message.
Examples
iex> import Conduit.Message
iex> message = put_body(%Conduit.Message{}, "hi")
iex> message.body
"hi"
iex> message = put_body(message, fn _mess -> "bye" end)
iex> message.body
"bye"
put_content_encoding(Conduit.Message.t(), content_encoding()) :: Conduit.Message.t()
Assigns a content_encoding to the message.
Examples
iex> import Conduit.Message
iex> message = put_content_encoding(%Conduit.Message{}, "gzip")
iex> message.content_encoding
"gzip"
iex> message = put_content_encoding(%Conduit.Message{}, fn _mess -> "gzip" end)
iex> message.content_encoding
"gzip"
put_content_type(Conduit.Message.t(), content_type()) :: Conduit.Message.t()
Assigns a content_type to the message.
Examples
iex> import Conduit.Message
iex> message = put_content_type(%Conduit.Message{}, "application/json")
iex> message.content_type
"application/json"
iex> message = put_content_type(%Conduit.Message{}, fn _mess -> "application/json" end)
iex> message.content_type
"application/json"
put_correlation_id(Conduit.Message.t(), correlation_id()) :: Conduit.Message.t()
Assigns a correlation_id to the message.
Examples
iex> import Conduit.Message
iex> message = put_correlation_id(%Conduit.Message{}, 1)
iex> message.correlation_id
1
iex> message = put_correlation_id(message, fn _mess -> 2 end)
iex> message.correlation_id
2
put_created_at(Conduit.Message.t(), created_at()) :: Conduit.Message.t()
Assigns a created_at to the message.
Examples
iex> import Conduit.Message
iex> message = put_created_at(%Conduit.Message{}, 1)
iex> message.created_at
1
iex> message = put_created_at(%Conduit.Message{}, fn _mess -> 1 end)
iex> message.created_at
1
put_created_by(Conduit.Message.t(), created_by()) :: Conduit.Message.t()
Assigns a created_by to the message.
Examples
iex> import Conduit.Message
iex> message = put_created_by(%Conduit.Message{}, "my_app")
iex> message.created_by
"my_app"
iex> message = put_created_by(%Conduit.Message{}, fn _mess ->"my_app" end)
iex> message.created_by
"my_app"
put_destination(Conduit.Message.t(), destination()) :: Conduit.Message.t()
Assigns the destination of the message.
Examples
iex> import Conduit.Message
iex> message =
iex> %Conduit.Message{}
iex> |> put_source("over.there")
iex> |> put_destination("my.queue")
iex> message.destination
"my.queue"
iex> message = put_destination(message, fn mess -> mess.source <> ".error" end)
iex> message.destination
"over.there.error"
put_header(Conduit.Message.t(), String.t(), any()) :: Conduit.Message.t()
Assigns a header for the message specified by key
.
Examples
iex> import Conduit.Message
iex> message = put_header(%Conduit.Message{}, "retries", 1)
iex> get_header(message, "retries")
1
iex> message = put_header(message, "retries", fn mess -> get_header(mess, "retries") + 1 end)
iex> get_header(message, "retries")
2
put_headers(Conduit.Message.t(), %{optional(String.t()) => any()}) :: Conduit.Message.t()
Assigns a header for the message specified by key
.
Examples
iex> import Conduit.Message
iex> message = put_headers(%Conduit.Message{}, %{"retries" => 1})
iex> get_header(message, "retries")
1
iex> message = put_headers(message, %{"retries" => fn mess -> get_header(mess, "retries") + 1 end})
iex> get_header(message, "retries")
2
put_message_id(Conduit.Message.t(), message_id()) :: Conduit.Message.t()
Assigns a message_id to the message.
Examples
iex> import Conduit.Message
iex> message = put_message_id(%Conduit.Message{}, 1)
iex> message.message_id
1
iex> message = put_message_id(%Conduit.Message{}, fn _mess -> 1 end)
iex> message.message_id
1
put_new_correlation_id(Conduit.Message.t(), correlation_id()) :: Conduit.Message.t()
Assigns a correlation_id to the message when one isn’t set already.
Examples
iex> import Conduit.Message
iex> message = put_new_correlation_id(%Conduit.Message{}, 1)
iex> message = put_new_correlation_id(message, 2)
iex> message.correlation_id
1
iex> message = put_new_correlation_id(%Conduit.Message{}, fn _mess -> 1 end)
iex> message = put_new_correlation_id(message, fn _mess -> 2 end)
iex> message.correlation_id
1
put_new_destination(Conduit.Message.t(), destination()) :: Conduit.Message.t()
Assigns a destination to the message when one isn’t set already.
Examples
iex> import Conduit.Message
iex> message = put_new_destination(%Conduit.Message{}, "your.queue")
iex> message = put_new_destination(message, "my.queue")
iex> message.destination
"your.queue"
iex> message = put_new_destination(%Conduit.Message{}, fn _mess -> "your.queue" end)
iex> message = put_new_destination(message, fn _mess -> "my.queue" end)
iex> message.destination
"your.queue"
put_new_message_id(Conduit.Message.t(), message_id()) :: Conduit.Message.t()
Assigns a message_id to the message when one isn’t set already.
Examples
iex> import Conduit.Message
iex> message = put_new_message_id(%Conduit.Message{}, 1)
iex> message = put_new_message_id(message, 2)
iex> message.message_id
1
iex> message = put_new_message_id(%Conduit.Message{}, fn _mess -> 1 end)
iex> message = put_new_message_id(message, fn _mess -> 2 end)
iex> message.message_id
1
put_new_source(Conduit.Message.t(), source()) :: Conduit.Message.t()
Assigns a source to the message when one isn’t set already.
Examples
iex> import Conduit.Message
iex> message = put_new_source(%Conduit.Message{}, "my.queue")
iex> message = put_new_source(message, "your.queue")
iex> message.source
"my.queue"
iex> message = put_new_source(%Conduit.Message{}, fn _mess -> "my.queue" end)
iex> message = put_new_source(message, fn _mess -> "your.queue" end)
iex> message.source
"my.queue"
put_private(Conduit.Message.t(), atom(), any()) :: Conduit.Message.t()
Assigns a named value to the message. This is intended for libraries and framework use.
Examples
iex> import Conduit.Message
iex> message = put_private(%Conduit.Message{}, :message_id, 1)
iex> get_private(message, :message_id)
1
put_source(Conduit.Message.t(), source()) :: Conduit.Message.t()
Assigns the source of the message.
Examples
iex> import Conduit.Message
iex> message =
iex> %Conduit.Message{}
iex> |> put_source("my.queue")
iex> |> put_header("routing_key", "my.routing_key")
iex> message.source
"my.queue"
iex> message = put_source(message, fn mess ->
iex> get_header(mess, "routing_key")
iex> end)
iex> message.source
"my.routing_key"
put_user_id(Conduit.Message.t(), user_id()) :: Conduit.Message.t()
Assigns a user_id to the message.
Examples
iex> import Conduit.Message
iex> message = put_user_id(%Conduit.Message{}, 1)
iex> message.user_id
1
iex> message = put_user_id(message, fn _mess -> 2 end)
iex> message.user_id
2
take( from :: Conduit.Message.t(), opts :: [fields: [atom()], headers: [String.t()]] ) :: Conduit.Message.t()
Creates a new message with the fields and headers specified.
Examples
iex> import Conduit.Message
iex> old_message =
iex> %Conduit.Message{}
iex> |> put_correlation_id("123")
iex> |> put_header("retries", 1)
iex> new_message = Conduit.Message.take(old_message,
iex> headers: ["retries"], fields: [:correlation_id])
iex> new_message.correlation_id
"123"
iex> get_header(new_message, "retries")
1