View Source BroadwayCloudPubSub.Producer (BroadwayCloudPubSub v0.9.1)
A GenStage producer that continuously receives messages from a Google Cloud Pub/Sub topic and acknowledges them after being successfully processed.
By default this producer uses BroadwayCloudPubSub.PullClient
to talk to Cloud
Pub/Sub, but you can provide your client by implementing the BroadwayCloudPubSub.Client
behaviour.
For a quick getting started on using Broadway with Cloud Pub/Sub, please see the Google Cloud Pub/Sub Guide.
Options
Aside from :receive_interval
and :client
which are generic and apply to all
producers (regardless of the client implementation), all other options are specific to
BroadwayCloudPubSub.PullClient
, which is the default client.
:client
- A module that implements the BroadwayCloudPubSub.Client behaviour. This module is responsible for fetching and acknowledging the messages. Pay attention that all options passed to the producer will be forwarded to the client. It's up to the client to normalize the options it needs. The default value isBroadwayCloudPubSub.PullClient
.:subscription
- Required. The name of the subscription, including the project. For example, if you project is named"my-project"
and your subscription is named"my-subscription"
, then your subscription name is"projects/my-project/subscriptions/my-subscription"
.:max_number_of_messages
- The maximum number of messages to be fetched per request. The default value is10
.:on_failure
- Configures the acking behaviour for failed messages. See the "Acknowledgements" section below for all the possible values. This option can also be changed for each message throughBroadway.Message.configure_ack/2
. The default value is:noop
.:on_success
- Configures the acking behaviour for successful messages. See the "Acknowledgements" section below for all the possible values. This option can also be changed for each message throughBroadway.Message.configure_ack/2
. The default value is:ack
.:receive_interval
- The duration (in milliseconds) for which the producer waits before making a request for more messages. The default value is5000
.:receive_timeout
- The maximum time (in milliseconds) to wait for a response before the pull client returns an error. The default value is:infinity
.:goth
- TheGoth
module to use for authentication. Note that this option only applies to the default token generator.:token_generator
- An MFArgs tuple that will be called before each request to fetch an authentication token. It should return{:ok, String.t()} | {:error, any()}
. By default this will invokeGoth.fetch/1
with the:goth
option. See the "Custom token generator" section below for more information.:base_url
- The base URL for the Cloud Pub/Sub service. This option is mostly useful for testing via the Pub/Sub emulator. The default value is"https://pubsub.googleapis.com"
.:finch
- The name of theFinch
pool. If no name is provided, then a default pool will be started by the pipeline's supervisor. The default value isnil
.
Custom token generator
A custom token generator can be given as a MFArgs tuple.
For example, define a MyApp.fetch_token/0
function:
defmodule MyApp do
@doc "Fetches a Google auth token"
def fetch_token do
with {:ok, token} <- Goth.fetch(MyApp.Goth)
{:ok, token.token}
end
end
end
and configure the producer to use it:
token_generator: {MyApp, :fetch_token, []}
Acknowledgements
You can use the :on_success
and :on_failure
options to control how
messages are acknowledged with the Pub/Sub system.
By default successful messages are acknowledged and failed messages are ignored.
You can set :on_success
and :on_failure
when starting this producer,
or change them for each message through Broadway.Message.configure_ack/2
.
The following values are supported by both :on_success
and :on_failure
:
:ack
- Acknowledge the message. Pub/Sub can remove the message from the subscription.:noop
- Do nothing. No requests will be made to Pub/Sub, and the message will be rescheduled according to the subscription-levelackDeadlineSeconds
.:nack
- Make a request to Pub/Sub to setackDeadlineSeconds
to0
, which may cause the message to be immediately redelivered to another connected consumer. Note that this does not modify the subscription-levelackDeadlineSeconds
used for subsequent messages.{:nack, integer}
- Modifies theackDeadlineSeconds
for a particular message. Note that this does not modify the subscription-levelackDeadlineSeconds
used for subsequent messages.
Batching
Even if you are not interested in working with Broadway batches via the
handle_batch/3
callback, we recommend all Broadway pipelines with Pub/Sub
producers to define a default batcher with batch_size
set to 10, so
messages can be acknowledged in batches, which improves the performance
and reduces the cost of integrating with Google Cloud Pub/Sub. In addition,
you should ensure that batch_timeout
is set to a value less than
the acknowledgement deadline on the subscription. Otherwise you could
potentially have messages that remain in the subscription and are never
acknowledged successfully.
Example
Broadway.start_link(MyBroadway,
name: MyBroadway,
producer: [
module: {BroadwayCloudPubSub.Producer,
goth: MyApp.Goth,
subscription: "projects/my-project/subscriptions/my_subscription"
}
],
processors: [
default: []
],
batchers: [
default: [
batch_size: 10,
batch_timeout: 2_000
]
]
)
The above configuration will set up a producer that continuously receives
messages from "projects/my-project/subscriptions/my_subscription"
and sends
them downstream.
Telemetry
This producer emits a few Telemetry events which are listed below.
[:broadway_cloud_pub_sub, :pull_client, :receive_messages, :start | :stop | :exception]
spans - these events are emitted in "span style" when executing pull requests to GCP PubSub. See:telemetry.span/3
.All these events have the measurements described in
:telemetry.span/3
. The events contain the following metadata::max_messages
- the number of messages requested after applying themax_messages
config option to the existing demand:demand
- the total demand accumulated into the producer:name
- the name of the Broadway topology
[:broadway_cloud_pub_sub, :pull_client, :ack, :start | :stop | :exception]
span - these events are emitted in "span style" when acking messages on GCP PubSub. See:telemetry.span/3
.All these events have the measurements described in
:telemetry.span/3
. The events contain the following metadata::name
- the name of the Broadway topology