View Source BroadwayCloudPubSub.Producer (BroadwayCloudPubSub v0.9.1)

A GenStage producer that continuously receives messages from a Google Cloud Pub/Sub topic and acknowledges them after being successfully processed.

By default this producer uses BroadwayCloudPubSub.PullClient to talk to Cloud Pub/Sub, but you can provide your client by implementing the BroadwayCloudPubSub.Client behaviour.

For a quick getting started on using Broadway with Cloud Pub/Sub, please see the Google Cloud Pub/Sub Guide.

Options

Aside from :receive_interval and :client which are generic and apply to all producers (regardless of the client implementation), all other options are specific to BroadwayCloudPubSub.PullClient, which is the default client.

  • :client - A module that implements the BroadwayCloudPubSub.Client behaviour. This module is responsible for fetching and acknowledging the messages. Pay attention that all options passed to the producer will be forwarded to the client. It's up to the client to normalize the options it needs. The default value is BroadwayCloudPubSub.PullClient.

  • :subscription - Required. The name of the subscription, including the project. For example, if you project is named "my-project" and your subscription is named "my-subscription", then your subscription name is "projects/my-project/subscriptions/my-subscription".

  • :max_number_of_messages - The maximum number of messages to be fetched per request. The default value is 10.

  • :on_failure - Configures the acking behaviour for failed messages. See the "Acknowledgements" section below for all the possible values. This option can also be changed for each message through Broadway.Message.configure_ack/2. The default value is :noop.

  • :on_success - Configures the acking behaviour for successful messages. See the "Acknowledgements" section below for all the possible values. This option can also be changed for each message through Broadway.Message.configure_ack/2. The default value is :ack.

  • :receive_interval - The duration (in milliseconds) for which the producer waits before making a request for more messages. The default value is 5000.

  • :receive_timeout - The maximum time (in milliseconds) to wait for a response before the pull client returns an error. The default value is :infinity.

  • :goth - The Goth module to use for authentication. Note that this option only applies to the default token generator.

  • :token_generator - An MFArgs tuple that will be called before each request to fetch an authentication token. It should return {:ok, String.t()} | {:error, any()}. By default this will invoke Goth.fetch/1 with the :goth option. See the "Custom token generator" section below for more information.

  • :base_url - The base URL for the Cloud Pub/Sub service. This option is mostly useful for testing via the Pub/Sub emulator. The default value is "https://pubsub.googleapis.com".

  • :finch - The name of the Finch pool. If no name is provided, then a default pool will be started by the pipeline's supervisor. The default value is nil.

Custom token generator

A custom token generator can be given as a MFArgs tuple.

For example, define a MyApp.fetch_token/0 function:

defmodule MyApp do

  @doc "Fetches a Google auth token"
  def fetch_token do
    with {:ok, token} <- Goth.fetch(MyApp.Goth)
      {:ok, token.token}
    end
  end
end

and configure the producer to use it:

token_generator: {MyApp, :fetch_token, []}

Acknowledgements

You can use the :on_success and :on_failure options to control how messages are acknowledged with the Pub/Sub system.

By default successful messages are acknowledged and failed messages are ignored. You can set :on_success and :on_failure when starting this producer, or change them for each message through Broadway.Message.configure_ack/2.

The following values are supported by both :on_success and :on_failure:

  • :ack - Acknowledge the message. Pub/Sub can remove the message from the subscription.

  • :noop - Do nothing. No requests will be made to Pub/Sub, and the message will be rescheduled according to the subscription-level ackDeadlineSeconds.

  • :nack - Make a request to Pub/Sub to set ackDeadlineSeconds to 0, which may cause the message to be immediately redelivered to another connected consumer. Note that this does not modify the subscription-level ackDeadlineSeconds used for subsequent messages.

  • {:nack, integer} - Modifies the ackDeadlineSeconds for a particular message. Note that this does not modify the subscription-level ackDeadlineSeconds used for subsequent messages.

Batching

Even if you are not interested in working with Broadway batches via the handle_batch/3 callback, we recommend all Broadway pipelines with Pub/Sub producers to define a default batcher with batch_size set to 10, so messages can be acknowledged in batches, which improves the performance and reduces the cost of integrating with Google Cloud Pub/Sub. In addition, you should ensure that batch_timeout is set to a value less than the acknowledgement deadline on the subscription. Otherwise you could potentially have messages that remain in the subscription and are never acknowledged successfully.

Example

Broadway.start_link(MyBroadway,
  name: MyBroadway,
  producer: [
    module: {BroadwayCloudPubSub.Producer,
      goth: MyApp.Goth,
      subscription: "projects/my-project/subscriptions/my_subscription"
    }
  ],
  processors: [
    default: []
  ],
  batchers: [
    default: [
      batch_size: 10,
      batch_timeout: 2_000
    ]
  ]
)

The above configuration will set up a producer that continuously receives messages from "projects/my-project/subscriptions/my_subscription" and sends them downstream.

Telemetry

This producer emits a few Telemetry events which are listed below.

  • [:broadway_cloud_pub_sub, :pull_client, :receive_messages, :start | :stop | :exception] spans - these events are emitted in "span style" when executing pull requests to GCP PubSub. See :telemetry.span/3.

    All these events have the measurements described in :telemetry.span/3. The events contain the following metadata:

    • :max_messages - the number of messages requested after applying the max_messages config option to the existing demand
    • :demand - the total demand accumulated into the producer
    • :name - the name of the Broadway topology
  • [:broadway_cloud_pub_sub, :pull_client, :ack, :start | :stop | :exception] span - these events are emitted in "span style" when acking messages on GCP PubSub. See :telemetry.span/3.

    All these events have the measurements described in :telemetry.span/3. The events contain the following metadata:

    • :name - the name of the Broadway topology