PubsubGrpc
High-performance Google Cloud Pub/Sub client using gRPC with connection pooling.
Features
- 🚀 High Performance: gRPC with connection pooling (2-3x faster than HTTP)
- 📦 Batch Publishing: Send 100-1000+ messages per API call
- 🔄 Auto-Recovery: Health monitoring with exponential backoff
- 🔐 Easy Auth: Goth, gcloud CLI, service accounts, or GCE metadata
- 🐳 Dev-Friendly: Docker Compose emulator included
- 📋 Schema Support: Protocol Buffer and Avro schemas
Installation
def deps do
[
{:pubsub_grpc, "~> 0.3.0"}
]
endQuick Start
Authentication
# Option 1: Use gcloud CLI (development)
gcloud auth application-default login
# Option 2: Set service account (production)
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account.json"
Basic Usage
# Start IEx
iex -S mix
# Publish messages (single or batch)
{:ok, _} = PubsubGrpc.publish_message("my-project", "my-topic", "Hello!")
messages = [
%{data: "Order #1", attributes: %{"type" => "order"}},
%{data: "Order #2", attributes: %{"type" => "order"}},
%{data: "Order #3", attributes: %{"type" => "order"}}
]
{:ok, response} = PubsubGrpc.publish("my-project", "my-topic", messages)
IO.puts("Published #{length(response.message_ids)} messages")
# Pull and process messages
{:ok, messages} = PubsubGrpc.pull("my-project", "my-subscription", 10)
Enum.each(messages, fn msg -> IO.puts("Received: #{msg.message.data}") end)
# Acknowledge messages
ack_ids = Enum.map(messages, & &1.ack_id)
:ok = PubsubGrpc.acknowledge("my-project", "my-subscription", ack_ids)Complete Example
project_id = "my-project"
topic_id = "events"
subscription_id = "event-processor"
# 1. Create topic and subscription
{:ok, _topic} = PubsubGrpc.create_topic(project_id, topic_id)
{:ok, _sub} = PubsubGrpc.create_subscription(project_id, topic_id, subscription_id)
# 2. Publish batch of messages (much faster than one-by-one!)
messages = Enum.map(1..100, fn i ->
%{
data: Jason.encode!(%{event: "user_action", id: i}),
attributes: %{"source" => "app", "priority" => "high"}
}
end)
{:ok, _} = PubsubGrpc.publish(project_id, topic_id, messages)
# 3. Pull and process messages
{:ok, received} = PubsubGrpc.pull(project_id, subscription_id, 10)
Enum.each(received, fn msg ->
data = Jason.decode!(msg.message.data)
IO.puts("Processing event: #{data["id"]}")
# Your business logic here
end)
# 4. Acknowledge processed messages
ack_ids = Enum.map(received, & &1.ack_id)
:ok = PubsubGrpc.acknowledge(project_id, subscription_id, ack_ids)
# 5. Check pool health
GrpcConnectionPool.status(PubsubGrpc.ConnectionPool)
# => %{status: :healthy, current_size: 5, expected_size: 5}Configuration
Production
# config/prod.exs
import Config
# No config needed - uses pubsub.googleapis.com:443
# Set GOOGLE_APPLICATION_CREDENTIALS environment variableDevelopment (Local Emulator)
# config/dev.exs
import Config
config :pubsub_grpc, :emulator,
project_id: "my-project-id",
host: "localhost",
port: 8085Start the emulator:
docker-compose up -d
API Reference
Topic Operations
# Create topic
{:ok, topic} = PubsubGrpc.create_topic(project_id, topic_id)
# List topics
{:ok, result} = PubsubGrpc.list_topics(project_id)
# Delete topic
:ok = PubsubGrpc.delete_topic(project_id, topic_id)Publishing
# Single message
{:ok, response} = PubsubGrpc.publish_message(project_id, topic_id, "data")
{:ok, response} = PubsubGrpc.publish_message(project_id, topic_id, "data", %{"key" => "value"})
# Batch messages (recommended for performance)
messages = [
%{data: "message 1", attributes: %{"type" => "order"}},
%{data: "message 2", attributes: %{"type" => "payment"}}
]
{:ok, response} = PubsubGrpc.publish(project_id, topic_id, messages)Subscription Operations
# Create subscription
{:ok, sub} = PubsubGrpc.create_subscription(project_id, topic_id, subscription_id)
{:ok, sub} = PubsubGrpc.create_subscription(project_id, topic_id, subscription_id,
ack_deadline_seconds: 30)
# Pull messages
{:ok, messages} = PubsubGrpc.pull(project_id, subscription_id)
{:ok, messages} = PubsubGrpc.pull(project_id, subscription_id, 100)
# Acknowledge messages
ack_ids = Enum.map(messages, & &1.ack_id)
:ok = PubsubGrpc.acknowledge(project_id, subscription_id, ack_ids)
# Delete subscription
:ok = PubsubGrpc.delete_subscription(project_id, subscription_id)Schema Management
# List schemas
{:ok, result} = PubsubGrpc.list_schemas(project_id)
# Get schema
{:ok, schema} = PubsubGrpc.get_schema(project_id, schema_id)
# Create schema
schema_def = """
syntax = "proto3";
message Event {
string id = 1;
string type = 2;
}
"""
{:ok, schema} = PubsubGrpc.create_schema(project_id, schema_id, :protocol_buffer, schema_def)
# Validate schema
{:ok, _} = PubsubGrpc.validate_schema(project_id, :protocol_buffer, schema_def)
# Delete schema
:ok = PubsubGrpc.delete_schema(project_id, schema_id)Performance Tips
Batch Publishing (5-10x faster)
# ❌ Slow: One message at a time
Enum.each(1..100, fn i ->
PubsubGrpc.publish_message(project_id, topic_id, "Message #{i}")
end)
# ✅ Fast: Batch all messages
messages = Enum.map(1..100, fn i -> %{data: "Message #{i}"} end)
PubsubGrpc.publish(project_id, topic_id, messages)Connection Pool Monitoring
# Check pool health
GrpcConnectionPool.status(PubsubGrpc.ConnectionPool)
# Get a channel directly (advanced)
{:ok, channel} = GrpcConnectionPool.get_channel(PubsubGrpc.ConnectionPool)Testing
# Start emulator
docker-compose up -d
# Run tests
mix test
# Stop emulator
docker-compose down
Why gRPC?
- 2-3x better throughput than HTTP REST API
- 40-60% lower latency due to persistent connections
- Efficient binary protocol (protobuf vs JSON)
- HTTP/2 multiplexing over single connection
- Automatic health monitoring and recovery
Advanced Configuration
Custom Connection Pool
# config/prod.exs
config :pubsub_grpc, GrpcConnectionPool,
endpoint: [
type: :production,
host: "pubsub.googleapis.com",
port: 443,
ssl: []
],
pool: [
size: 10,
name: PubsubGrpc.ConnectionPool
],
connection: [
keepalive: 30_000,
ping_interval: 25_000
]Using Goth for Authentication
# Add to dependencies
{:goth, "~> 1.4"}
# Configure
config :goth, json: {:system, "GOOGLE_APPLICATION_CREDENTIALS_JSON"}
# Add to supervision tree
{Goth, name: MyApp.Goth, source: {:service_account, credentials}}Troubleshooting
Authentication Error
# Ensure you're authenticated
gcloud auth application-default login
# Or set service account
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/key.json"
Connection Issues
# Check pool status
GrpcConnectionPool.status(PubsubGrpc.ConnectionPool)
# Should show: %{status: :healthy, current_size: 5, expected_size: 5}
# Restart application if neededEmulator Not Working
# Check if emulator is running
docker ps | grep pubsub
# Restart emulator
docker-compose down
docker-compose up -d
# Check logs
docker-compose logs -f pubsub-emulator
Contributing
- Fork the repository
- Start emulator:
docker-compose up -d - Run tests:
mix test - Make changes and ensure tests pass
- Submit a pull request
License
MIT