PubsubGrpc (PubsubGrpc v0.3.0)

Main entry point for Google Cloud Pub/Sub gRPC operations with connection pooling.

This module provides a convenient API for common Pub/Sub operations like creating topics, publishing messages, pulling messages, and managing subscriptions using a connection pool powered by Poolex.

Configuration

Production (Google Cloud)

For production use, the library connects to pubsub.googleapis.com:443 and supports multiple authentication methods:

Option 1: Goth Library (Recommended)

# Add to your supervision tree
children = [
  {Goth, name: MyApp.Goth, source: {:service_account, credentials}},
  # ... other children
]

# Configure PubsubGrpc to use Goth
config :pubsub_grpc, :goth, MyApp.Goth

Option 2: gcloud CLI

# Authenticate using gcloud
gcloud auth application-default login

Option 3: Service Account Key

# Set environment variable
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json"

Option 4: Google Cloud Environment

# Automatically works on GCE, GKE, Cloud Run, etc.

Development/Test (Local Emulator)

# config/dev.exs
config :pubsub_grpc, :emulator,
  project_id: "my-project-id",
  host: "localhost",
  port: 8085

Examples

# Create a topic
{:ok, topic} = PubsubGrpc.create_topic("my-project", "my-topic")

# Publish messages
messages = [%{data: "Hello", attributes: %{"source" => "app"}}]
{:ok, response} = PubsubGrpc.publish("my-project", "my-topic", messages)

# Create subscription
{:ok, subscription} = PubsubGrpc.create_subscription("my-project", "my-topic", "my-sub")

# Pull messages
{:ok, messages} = PubsubGrpc.pull("my-project", "my-sub", 10)

# Acknowledge messages
ack_ids = Enum.map(messages, & &1.ack_id)
:ok = PubsubGrpc.acknowledge("my-project", "my-sub", ack_ids)

# Schema management (v0.3.1+)
{:ok, schemas} = PubsubGrpc.list_schemas("my-project")
{:ok, schema} = PubsubGrpc.get_schema("my-project", "my-schema")

Summary

Functions

Acknowledges received messages.

Creates a new Pub/Sub topic.

Deletes a Pub/Sub topic.

Executes a custom operation using a connection from the pool.

Gets details of a specific schema.

Lists schemas in a project.

Lists topics in a project.

Publishes messages to a topic.

Convenience function to publish a single message.

Pulls messages from a subscription.

Validates a schema definition.

Executes multiple operations using the same connection.

Functions

acknowledge(project_id, subscription_id, ack_ids)

Acknowledges received messages.

Parameters

  • project_id: The Google Cloud project ID
  • subscription_id: The subscription identifier
  • ack_ids: List of acknowledgment IDs from received messages

Returns

  • :ok - Successfully acknowledged messages
  • {:error, reason} - Error acknowledging messages

Examples

{:ok, messages} = PubsubGrpc.pull("my-project", "my-sub")
ack_ids = Enum.map(messages, & &1.ack_id)
:ok = PubsubGrpc.acknowledge("my-project", "my-sub", ack_ids)

create_schema(project_id, schema_id, type, definition)

Creates a new schema.

Parameters

  • project_id: The Google Cloud project ID
  • schema_id: The schema identifier
  • type: Schema type (:protocol_buffer or :avro)
  • definition: The schema definition string

Returns

  • {:ok, schema} - Created schema
  • {:error, reason} - Error creating schema

Examples

definition = '''
syntax = "proto3";
message User { string name = 1; }
'''

{:ok, schema} = PubsubGrpc.create_schema("my-project", "user-schema", :protocol_buffer, definition)

create_subscription(project_id, topic_id, subscription_id, opts \\ [])

Creates a subscription to a topic.

Parameters

  • project_id: The Google Cloud project ID
  • topic_id: The topic identifier
  • subscription_id: The subscription identifier
  • opts: Optional parameters
    • :ack_deadline_seconds: Message acknowledgment deadline (default: 60)

Returns

  • {:ok, subscription} - Successfully created subscription
  • {:error, reason} - Error creating subscription

Examples

{:ok, sub} = PubsubGrpc.create_subscription("my-project", "my-topic", "my-sub")
{:ok, sub} = PubsubGrpc.create_subscription("my-project", "my-topic", "my-sub", ack_deadline_seconds: 30)

create_topic(project_id, topic_id)

Creates a new Pub/Sub topic.

Parameters

  • project_id: The Google Cloud project ID
  • topic_id: The topic identifier (without the full path)

Returns

  • {:ok, topic} - Successfully created topic
  • {:error, reason} - Error creating topic

Examples

{:ok, topic} = PubsubGrpc.create_topic("my-project", "events")
{:error, %GRPC.RPCError{status: 6}} = PubsubGrpc.create_topic("my-project", "existing-topic")

delete_schema(project_id, schema_id)

Deletes a schema.

Parameters

  • project_id: The Google Cloud project ID
  • schema_id: The schema identifier

Returns

  • :ok - Successfully deleted schema
  • {:error, reason} - Error deleting schema

Examples

:ok = PubsubGrpc.delete_schema("my-project", "old-schema")

delete_subscription(project_id, subscription_id)

Deletes a subscription.

Parameters

  • project_id: The Google Cloud project ID
  • subscription_id: The subscription identifier

Returns

  • :ok - Successfully deleted subscription
  • {:error, reason} - Error deleting subscription

delete_topic(project_id, topic_id)

Deletes a Pub/Sub topic.

Parameters

  • project_id: The Google Cloud project ID
  • topic_id: The topic identifier

Returns

  • :ok - Successfully deleted topic
  • {:error, reason} - Error deleting topic

execute(operation_fn, opts \\ [])

Executes a custom operation using a connection from the pool.

This function allows you to execute custom Pub/Sub operations that aren't covered by the convenience functions above.

Parameters

  • operation_fn: A function that takes (channel) and returns the operation result
  • opts: Optional parameters
    • :pool - Pool name to use (default: PubsubGrpc.ConnectionPool)
    • :checkout_timeout - Timeout for checking out connections

Returns

  • Operation result from the GRPC call
  • {:error, reason} - Error executing operation

Examples

# Custom operation
operation = fn channel ->
  request = %Google.Pubsub.V1.GetTopicRequest{topic: "projects/my-project/topics/my-topic"}
  auth_opts = PubsubGrpc.Auth.request_opts()
  Google.Pubsub.V1.Publisher.Stub.get_topic(channel, request, auth_opts)
end

{:ok, topic} = PubsubGrpc.execute(operation)

get_schema(project_id, schema_id, opts \\ [])

Gets details of a specific schema.

Parameters

  • project_id: The Google Cloud project ID
  • schema_id: The schema identifier
  • opts: Optional parameters
    • :view - Schema view level (:basic or :full, default: :full)

Returns

  • {:ok, schema} - Schema details
  • {:error, reason} - Error getting schema

Examples

{:ok, schema} = PubsubGrpc.get_schema("my-project", "my-schema")
IO.puts("Schema definition: #{schema.definition}")

list_schema_revisions(project_id, schema_id, opts \\ [])

Lists revisions of a schema.

Parameters

  • project_id: The Google Cloud project ID
  • schema_id: The schema identifier
  • opts: Optional parameters
    • :view - Schema view level (:basic or :full, default: :basic)
    • :page_size - Maximum number of revisions to return
    • :page_token - Token for pagination

Returns

  • {:ok, %{schemas: schema_revisions, next_page_token: token}} - List of schema revisions
  • {:error, reason} - Error listing schema revisions

Examples

{:ok, result} = PubsubGrpc.list_schema_revisions("my-project", "my-schema")

list_schemas(project_id, opts \\ [])

Lists schemas in a project.

Parameters

  • project_id: The Google Cloud project ID
  • opts: Optional parameters
    • :view - Schema view level (:basic or :full, default: :basic)
    • :page_size - Maximum number of schemas to return
    • :page_token - Token for pagination

Returns

  • {:ok, %{schemas: schemas, next_page_token: token}} - List of schemas
  • {:error, reason} - Error listing schemas

Examples

{:ok, result} = PubsubGrpc.list_schemas("my-project")
Enum.each(result.schemas, fn schema ->
  IO.puts("Schema: #{schema.name} (#{schema.type})")
end)

list_topics(project_id, opts \\ [])

Lists topics in a project.

Parameters

  • project_id: The Google Cloud project ID
  • opts: Optional parameters
    • :page_size: Maximum number of topics to return
    • :page_token: Token for pagination

Returns

  • {:ok, %{topics: topics, next_page_token: token}} - List of topics
  • {:error, reason} - Error listing topics

publish(project_id, topic_id, messages)

Publishes messages to a topic.

Parameters

  • project_id: The Google Cloud project ID
  • topic_id: The topic identifier
  • messages: List of message maps with :data and optional :attributes

Message Format

%{data: "message content", attributes: %{"key" => "value"}}

Returns

  • {:ok, %{message_ids: [message_ids]}} - Successfully published messages
  • {:error, reason} - Error publishing messages

Examples

messages = [
  %{data: "Hello World", attributes: %{"source" => "app"}},
  %{data: "Another message"}
]
{:ok, response} = PubsubGrpc.publish("my-project", "my-topic", messages)

publish_message(project_id, topic_id, data, attributes \\ %{})

Convenience function to publish a single message.

Parameters

  • project_id: The Google Cloud project ID
  • topic_id: The topic identifier
  • data: Message data (string)
  • attributes: Optional message attributes (map)

Returns

  • {:ok, %{message_ids: [message_id]}} - Successfully published message
  • {:error, reason} - Error publishing message

Examples

{:ok, response} = PubsubGrpc.publish_message("my-project", "my-topic", "Hello!")
{:ok, response} = PubsubGrpc.publish_message("my-project", "my-topic", "Hello!", %{"source" => "app"})

pull(project_id, subscription_id, max_messages \\ 10)

Pulls messages from a subscription.

Parameters

  • project_id: The Google Cloud project ID
  • subscription_id: The subscription identifier
  • max_messages: Maximum number of messages to pull (default: 10)

Returns

  • {:ok, messages} - List of received messages
  • {:error, reason} - Error pulling messages

Examples

{:ok, messages} = PubsubGrpc.pull("my-project", "my-sub")
{:ok, messages} = PubsubGrpc.pull("my-project", "my-sub", 5)

# Process messages
Enum.each(messages, fn received_msg ->
  IO.puts("Received: #{received_msg.message.data}")
  # Remember to acknowledge: PubsubGrpc.acknowledge(project, sub, [received_msg.ack_id])
end)

validate_schema(project_id, type, definition)

Validates a schema definition.

Parameters

  • project_id: The Google Cloud project ID
  • type: Schema type (:protocol_buffer or :avro)
  • definition: The schema definition string

Returns

  • {:ok, validation_result} - Schema is valid
  • {:error, reason} - Validation error

Examples

definition = '''
syntax = "proto3";
message User { string name = 1; }
'''

{:ok, result} = PubsubGrpc.validate_schema("my-project", :protocol_buffer, definition)

with_connection(fun)

Executes multiple operations using the same connection.

This is more efficient when you need to perform several operations in sequence as it reuses the same connection instead of checking out a new one each time.

Parameters

  • fun: A function that receives a channel and performs multiple operations

Returns

  • Result of the function execution
  • {:error, reason} - Error executing operations

Examples

result = PubsubGrpc.with_connection(fn channel ->
  auth_opts = PubsubGrpc.Auth.request_opts()

  # Create topic
  topic_req = %Google.Pubsub.V1.Topic{name: "projects/my-project/topics/batch-topic"}
  {:ok, _topic} = Google.Pubsub.V1.Publisher.Stub.create_topic(channel, topic_req, auth_opts)

  # Publish message
  msg = %Google.Pubsub.V1.PubsubMessage{data: "Batch message"}
  pub_req = %Google.Pubsub.V1.PublishRequest{
    topic: "projects/my-project/topics/batch-topic",
    messages: [msg]
  }
  Google.Pubsub.V1.Publisher.Stub.publish(channel, pub_req, auth_opts)
end)