PubsubGrpc (PubsubGrpc v0.3.0)
Main entry point for Google Cloud Pub/Sub gRPC operations with connection pooling.
This module provides a convenient API for common Pub/Sub operations like creating topics, publishing messages, pulling messages, and managing subscriptions using a connection pool powered by Poolex.
Configuration
Production (Google Cloud)
For production use, the library connects to pubsub.googleapis.com:443 and supports multiple authentication methods:
Option 1: Goth Library (Recommended)
# Add to your supervision tree
children = [
{Goth, name: MyApp.Goth, source: {:service_account, credentials}},
# ... other children
]
# Configure PubsubGrpc to use Goth
config :pubsub_grpc, :goth, MyApp.GothOption 2: gcloud CLI
# Authenticate using gcloud
gcloud auth application-default loginOption 3: Service Account Key
# Set environment variable
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json"Option 4: Google Cloud Environment
# Automatically works on GCE, GKE, Cloud Run, etc.Development/Test (Local Emulator)
# config/dev.exs
config :pubsub_grpc, :emulator,
project_id: "my-project-id",
host: "localhost",
port: 8085Examples
# Create a topic
{:ok, topic} = PubsubGrpc.create_topic("my-project", "my-topic")
# Publish messages
messages = [%{data: "Hello", attributes: %{"source" => "app"}}]
{:ok, response} = PubsubGrpc.publish("my-project", "my-topic", messages)
# Create subscription
{:ok, subscription} = PubsubGrpc.create_subscription("my-project", "my-topic", "my-sub")
# Pull messages
{:ok, messages} = PubsubGrpc.pull("my-project", "my-sub", 10)
# Acknowledge messages
ack_ids = Enum.map(messages, & &1.ack_id)
:ok = PubsubGrpc.acknowledge("my-project", "my-sub", ack_ids)
# Schema management (v0.3.1+)
{:ok, schemas} = PubsubGrpc.list_schemas("my-project")
{:ok, schema} = PubsubGrpc.get_schema("my-project", "my-schema")
Summary
Functions
Acknowledges received messages.
Creates a new schema.
Creates a subscription to a topic.
Creates a new Pub/Sub topic.
Deletes a schema.
Deletes a subscription.
Deletes a Pub/Sub topic.
Executes a custom operation using a connection from the pool.
Gets details of a specific schema.
Lists revisions of a schema.
Lists schemas in a project.
Lists topics in a project.
Publishes messages to a topic.
Convenience function to publish a single message.
Pulls messages from a subscription.
Validates a schema definition.
Executes multiple operations using the same connection.
Functions
Acknowledges received messages.
Parameters
project_id: The Google Cloud project IDsubscription_id: The subscription identifierack_ids: List of acknowledgment IDs from received messages
Returns
:ok- Successfully acknowledged messages{:error, reason}- Error acknowledging messages
Examples
{:ok, messages} = PubsubGrpc.pull("my-project", "my-sub")
ack_ids = Enum.map(messages, & &1.ack_id)
:ok = PubsubGrpc.acknowledge("my-project", "my-sub", ack_ids)
Creates a new schema.
Parameters
project_id: The Google Cloud project IDschema_id: The schema identifiertype: Schema type (:protocol_bufferor:avro)definition: The schema definition string
Returns
{:ok, schema}- Created schema{:error, reason}- Error creating schema
Examples
definition = '''
syntax = "proto3";
message User { string name = 1; }
'''
{:ok, schema} = PubsubGrpc.create_schema("my-project", "user-schema", :protocol_buffer, definition)
Creates a subscription to a topic.
Parameters
project_id: The Google Cloud project IDtopic_id: The topic identifiersubscription_id: The subscription identifieropts: Optional parameters:ack_deadline_seconds: Message acknowledgment deadline (default: 60)
Returns
{:ok, subscription}- Successfully created subscription{:error, reason}- Error creating subscription
Examples
{:ok, sub} = PubsubGrpc.create_subscription("my-project", "my-topic", "my-sub")
{:ok, sub} = PubsubGrpc.create_subscription("my-project", "my-topic", "my-sub", ack_deadline_seconds: 30)
Creates a new Pub/Sub topic.
Parameters
project_id: The Google Cloud project IDtopic_id: The topic identifier (without the full path)
Returns
{:ok, topic}- Successfully created topic{:error, reason}- Error creating topic
Examples
{:ok, topic} = PubsubGrpc.create_topic("my-project", "events")
{:error, %GRPC.RPCError{status: 6}} = PubsubGrpc.create_topic("my-project", "existing-topic")
Deletes a schema.
Parameters
project_id: The Google Cloud project IDschema_id: The schema identifier
Returns
:ok- Successfully deleted schema{:error, reason}- Error deleting schema
Examples
:ok = PubsubGrpc.delete_schema("my-project", "old-schema")
Deletes a subscription.
Parameters
project_id: The Google Cloud project IDsubscription_id: The subscription identifier
Returns
:ok- Successfully deleted subscription{:error, reason}- Error deleting subscription
Deletes a Pub/Sub topic.
Parameters
project_id: The Google Cloud project IDtopic_id: The topic identifier
Returns
:ok- Successfully deleted topic{:error, reason}- Error deleting topic
Executes a custom operation using a connection from the pool.
This function allows you to execute custom Pub/Sub operations that aren't covered by the convenience functions above.
Parameters
operation_fn: A function that takes(channel)and returns the operation resultopts: Optional parameters:pool- Pool name to use (default: PubsubGrpc.ConnectionPool):checkout_timeout- Timeout for checking out connections
Returns
- Operation result from the GRPC call
{:error, reason}- Error executing operation
Examples
# Custom operation
operation = fn channel ->
request = %Google.Pubsub.V1.GetTopicRequest{topic: "projects/my-project/topics/my-topic"}
auth_opts = PubsubGrpc.Auth.request_opts()
Google.Pubsub.V1.Publisher.Stub.get_topic(channel, request, auth_opts)
end
{:ok, topic} = PubsubGrpc.execute(operation)
Gets details of a specific schema.
Parameters
project_id: The Google Cloud project IDschema_id: The schema identifieropts: Optional parameters:view- Schema view level (:basicor:full, default::full)
Returns
{:ok, schema}- Schema details{:error, reason}- Error getting schema
Examples
{:ok, schema} = PubsubGrpc.get_schema("my-project", "my-schema")
IO.puts("Schema definition: #{schema.definition}")
Lists revisions of a schema.
Parameters
project_id: The Google Cloud project IDschema_id: The schema identifieropts: Optional parameters:view- Schema view level (:basicor:full, default::basic):page_size- Maximum number of revisions to return:page_token- Token for pagination
Returns
{:ok, %{schemas: schema_revisions, next_page_token: token}}- List of schema revisions{:error, reason}- Error listing schema revisions
Examples
{:ok, result} = PubsubGrpc.list_schema_revisions("my-project", "my-schema")
Lists schemas in a project.
Parameters
project_id: The Google Cloud project IDopts: Optional parameters:view- Schema view level (:basicor:full, default::basic):page_size- Maximum number of schemas to return:page_token- Token for pagination
Returns
{:ok, %{schemas: schemas, next_page_token: token}}- List of schemas{:error, reason}- Error listing schemas
Examples
{:ok, result} = PubsubGrpc.list_schemas("my-project")
Enum.each(result.schemas, fn schema ->
IO.puts("Schema: #{schema.name} (#{schema.type})")
end)
Lists topics in a project.
Parameters
project_id: The Google Cloud project IDopts: Optional parameters:page_size: Maximum number of topics to return:page_token: Token for pagination
Returns
{:ok, %{topics: topics, next_page_token: token}}- List of topics{:error, reason}- Error listing topics
Publishes messages to a topic.
Parameters
project_id: The Google Cloud project IDtopic_id: The topic identifiermessages: List of message maps with:dataand optional:attributes
Message Format
%{data: "message content", attributes: %{"key" => "value"}}Returns
{:ok, %{message_ids: [message_ids]}}- Successfully published messages{:error, reason}- Error publishing messages
Examples
messages = [
%{data: "Hello World", attributes: %{"source" => "app"}},
%{data: "Another message"}
]
{:ok, response} = PubsubGrpc.publish("my-project", "my-topic", messages)
Convenience function to publish a single message.
Parameters
project_id: The Google Cloud project IDtopic_id: The topic identifierdata: Message data (string)attributes: Optional message attributes (map)
Returns
{:ok, %{message_ids: [message_id]}}- Successfully published message{:error, reason}- Error publishing message
Examples
{:ok, response} = PubsubGrpc.publish_message("my-project", "my-topic", "Hello!")
{:ok, response} = PubsubGrpc.publish_message("my-project", "my-topic", "Hello!", %{"source" => "app"})
Pulls messages from a subscription.
Parameters
project_id: The Google Cloud project IDsubscription_id: The subscription identifiermax_messages: Maximum number of messages to pull (default: 10)
Returns
{:ok, messages}- List of received messages{:error, reason}- Error pulling messages
Examples
{:ok, messages} = PubsubGrpc.pull("my-project", "my-sub")
{:ok, messages} = PubsubGrpc.pull("my-project", "my-sub", 5)
# Process messages
Enum.each(messages, fn received_msg ->
IO.puts("Received: #{received_msg.message.data}")
# Remember to acknowledge: PubsubGrpc.acknowledge(project, sub, [received_msg.ack_id])
end)
Validates a schema definition.
Parameters
project_id: The Google Cloud project IDtype: Schema type (:protocol_bufferor:avro)definition: The schema definition string
Returns
{:ok, validation_result}- Schema is valid{:error, reason}- Validation error
Examples
definition = '''
syntax = "proto3";
message User { string name = 1; }
'''
{:ok, result} = PubsubGrpc.validate_schema("my-project", :protocol_buffer, definition)
Executes multiple operations using the same connection.
This is more efficient when you need to perform several operations in sequence as it reuses the same connection instead of checking out a new one each time.
Parameters
fun: A function that receives a channel and performs multiple operations
Returns
- Result of the function execution
{:error, reason}- Error executing operations
Examples
result = PubsubGrpc.with_connection(fn channel ->
auth_opts = PubsubGrpc.Auth.request_opts()
# Create topic
topic_req = %Google.Pubsub.V1.Topic{name: "projects/my-project/topics/batch-topic"}
{:ok, _topic} = Google.Pubsub.V1.Publisher.Stub.create_topic(channel, topic_req, auth_opts)
# Publish message
msg = %Google.Pubsub.V1.PubsubMessage{data: "Batch message"}
pub_req = %Google.Pubsub.V1.PublishRequest{
topic: "projects/my-project/topics/batch-topic",
messages: [msg]
}
Google.Pubsub.V1.Publisher.Stub.publish(channel, pub_req, auth_opts)
end)