BroadwayCloudPubSub.Producer (BroadwayCloudPubSub v0.6.2) View Source

A GenStage producer that continuously receives messages from a Google Cloud Pub/Sub topic and acknowledges them after being successfully processed.

By default this producer uses BroadwayCloudPubSub.GoogleApiClient to talk to Cloud Pub/Sub, but you can provide your client by implementing the BroadwayCloudPubSub.Client behaviour.

Options using BroadwayCloudPubSub.GoogleApiClient

  • :subscription - Required. The name of the subscription. Example: "projects/my-project/subscriptions/my-subscription"

  • :max_number_of_messages - Optional. The maximum number of messages to be fetched per request. Default is 10.

  • :return_immediately - Optional. If this field set to true, the system will respond immediately even if it there are no messages available to return in the Pull response. Otherwise, the system may wait (for a bounded amount of time) until at least one message is available, rather than returning no messages. Default is nil.

  • :scope - Optional. A string representing the scope or scopes to use when fetching an access token. Default is "https://www.googleapis.com/auth/pubsub". Note: The :scope option only applies to the default token generator.

  • :token_generator - Optional. An MFArgs tuple that will be called before each request to fetch an authentication token. It should return {:ok, String.t()} | {:error, any()}. Default generator uses Goth.Token.for_scope/1 with "https://www.googleapis.com/auth/pubsub".

  • :pool_opts - Optional. A set of additional options to override the default :hackney_pool configuration options.

Acknowledger options

These options apply to BroadwayCloudPubSub.GoogleApiClient acknowledgement API:

  • :on_success - Optional. Configures the behaviour for successful messages. See the "Acknowledgements" section below for all the possible values. This option can also be changed for each message through Broadway.Message.configure_ack/2. Default is :ack.

  • :on_failure - Optional. Configures the behaviour for failed messages. See the "Acknowledgements" section below for all the possible values. This option can also be changed for each message through Broadway.Message.configure_ack/2. Default is :noop.

Additional options

These options apply to all producers, regardless of client implementation:

  • :client - Optional. A module that implements the BroadwayCloudPubSub.Client behaviour. This module is responsible for fetching and acknowledging the messages. Pay attention that all options passed to the producer will be forwarded to the client. It's up to the client to normalize the options it needs. Default is BroadwayCloudPubSub.GoogleApiClient.

  • :pool_size - Optional. The size of the connection pool. Default is twice the producer concurrency.

  • :receive_interval - Optional. The duration (in milliseconds) for which the producer waits before making a request for more messages. Default is 5000.

Acknowledgements

You can use the :on_success and :on_failure options to control how messages are acknowledged with the Pub/Sub system.

By default successful messages are acknowledged and failed messages are ignored. You can set :on_success and :on_failure when starting this producer, or change them for each message through Broadway.Message.configure_ack/2.

The following values are supported by both :on_success and :on_failure:

  • :ack - Acknowledge the message. Pub/Sub can remove the message from the subscription.

  • :noop - Do nothing. No requests will be made to Pub/Sub, and the message will be rescheduled according to the subscription-level ackDeadlineSeconds.

  • :nack - Make a request to Pub/Sub to set ackDeadlineSeconds to 0, which may cause the message to be immediately redelivered to another connected consumer. Note that this does not modify the subscription-level ackDeadlineSeconds used for subsequent messages.

  • {:nack, integer} - Modifies the ackDeadlineSeconds for a particular message. Note that this does not modify the subscription-level ackDeadlineSeconds used for subsequent messages.

Batching

Even if you are not interested in working with Broadway batches via the handle_batch/3 callback, we recommend all Broadway pipelines with Pub/Sub producers to define a default batcher with batch_size set to 10, so messages can be acknowledged in batches, which improves the performance and reduces the cost of integrating with Google Cloud Pub/Sub.

Example

Broadway.start_link(MyBroadway,
  name: MyBroadway,
  producer: [
    module: {BroadwayCloudPubSub.Producer,
      subscription: "projects/my-project/subscriptions/my_subscription"
    }
  ],
  processors: [
    default: []
  ],
  batchers: [
    default: [
      batch_size: 10,
      batch_timeout: 2_000
    ]
  ]
)

The above configuration will set up a producer that continuously receives messages from "projects/my-project/subscriptions/my_subscription" and sends them downstream.