PubsubGrpc

High-performance Google Cloud Pub/Sub client using gRPC with connection pooling.

Hex.pm

Features

  • 🚀 High Performance: gRPC with connection pooling (2-3x faster than HTTP)
  • 📦 Batch Publishing: Send 100-1000+ messages per API call
  • 🔄 Auto-Recovery: Health monitoring with exponential backoff
  • 🔐 Easy Auth: Goth, gcloud CLI, service accounts, or GCE metadata
  • 🐳 Dev-Friendly: Docker Compose emulator included
  • 📋 Schema Support: Protocol Buffer and Avro schemas

Installation

def deps do
  [
    {:pubsub_grpc, "~> 0.3.0"}
  ]
end

Quick Start

Authentication

# Option 1: Use gcloud CLI (development)
gcloud auth application-default login

# Option 2: Set service account (production)
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account.json"

Basic Usage

# Start IEx
iex -S mix

# Publish messages (single or batch)
{:ok, _} = PubsubGrpc.publish_message("my-project", "my-topic", "Hello!")

messages = [
  %{data: "Order #1", attributes: %{"type" => "order"}},
  %{data: "Order #2", attributes: %{"type" => "order"}},
  %{data: "Order #3", attributes: %{"type" => "order"}}
]
{:ok, response} = PubsubGrpc.publish("my-project", "my-topic", messages)
IO.puts("Published #{length(response.message_ids)} messages")

# Pull and process messages
{:ok, messages} = PubsubGrpc.pull("my-project", "my-subscription", 10)
Enum.each(messages, fn msg -> IO.puts("Received: #{msg.message.data}") end)

# Acknowledge messages
ack_ids = Enum.map(messages, & &1.ack_id)
:ok = PubsubGrpc.acknowledge("my-project", "my-subscription", ack_ids)

Complete Example

project_id = "my-project"
topic_id = "events"
subscription_id = "event-processor"

# 1. Create topic and subscription
{:ok, _topic} = PubsubGrpc.create_topic(project_id, topic_id)
{:ok, _sub} = PubsubGrpc.create_subscription(project_id, topic_id, subscription_id)

# 2. Publish batch of messages (much faster than one-by-one!)
messages = Enum.map(1..100, fn i ->
  %{
    data: Jason.encode!(%{event: "user_action", id: i}),
    attributes: %{"source" => "app", "priority" => "high"}
  }
end)
{:ok, _} = PubsubGrpc.publish(project_id, topic_id, messages)

# 3. Pull and process messages
{:ok, received} = PubsubGrpc.pull(project_id, subscription_id, 10)
Enum.each(received, fn msg ->
  data = Jason.decode!(msg.message.data)
  IO.puts("Processing event: #{data["id"]}")
  # Your business logic here
end)

# 4. Acknowledge processed messages
ack_ids = Enum.map(received, & &1.ack_id)
:ok = PubsubGrpc.acknowledge(project_id, subscription_id, ack_ids)

# 5. Check pool health
GrpcConnectionPool.status(PubsubGrpc.ConnectionPool)
# => %{status: :healthy, current_size: 5, expected_size: 5}

Configuration

Production

# config/prod.exs
import Config

# No config needed - uses pubsub.googleapis.com:443
# Set GOOGLE_APPLICATION_CREDENTIALS environment variable

Development (Local Emulator)

# config/dev.exs
import Config

config :pubsub_grpc, :emulator,
  project_id: "my-project-id",
  host: "localhost",
  port: 8085

Start the emulator:

docker-compose up -d

API Reference

Topic Operations

# Create topic
{:ok, topic} = PubsubGrpc.create_topic(project_id, topic_id)

# List topics
{:ok, result} = PubsubGrpc.list_topics(project_id)

# Delete topic
:ok = PubsubGrpc.delete_topic(project_id, topic_id)

Publishing

# Single message
{:ok, response} = PubsubGrpc.publish_message(project_id, topic_id, "data")
{:ok, response} = PubsubGrpc.publish_message(project_id, topic_id, "data", %{"key" => "value"})

# Batch messages (recommended for performance)
messages = [
  %{data: "message 1", attributes: %{"type" => "order"}},
  %{data: "message 2", attributes: %{"type" => "payment"}}
]
{:ok, response} = PubsubGrpc.publish(project_id, topic_id, messages)

Subscription Operations

# Create subscription
{:ok, sub} = PubsubGrpc.create_subscription(project_id, topic_id, subscription_id)
{:ok, sub} = PubsubGrpc.create_subscription(project_id, topic_id, subscription_id,
  ack_deadline_seconds: 30)

# Pull messages
{:ok, messages} = PubsubGrpc.pull(project_id, subscription_id)
{:ok, messages} = PubsubGrpc.pull(project_id, subscription_id, 100)

# Acknowledge messages
ack_ids = Enum.map(messages, & &1.ack_id)
:ok = PubsubGrpc.acknowledge(project_id, subscription_id, ack_ids)

# Delete subscription
:ok = PubsubGrpc.delete_subscription(project_id, subscription_id)

Schema Management

# List schemas
{:ok, result} = PubsubGrpc.list_schemas(project_id)

# Get schema
{:ok, schema} = PubsubGrpc.get_schema(project_id, schema_id)

# Create schema
schema_def = """
syntax = "proto3";
message Event {
  string id = 1;
  string type = 2;
}
"""
{:ok, schema} = PubsubGrpc.create_schema(project_id, schema_id, :protocol_buffer, schema_def)

# Validate schema
{:ok, _} = PubsubGrpc.validate_schema(project_id, :protocol_buffer, schema_def)

# Delete schema
:ok = PubsubGrpc.delete_schema(project_id, schema_id)

Performance Tips

Batch Publishing (5-10x faster)

# ❌ Slow: One message at a time
Enum.each(1..100, fn i ->
  PubsubGrpc.publish_message(project_id, topic_id, "Message #{i}")
end)

# ✅ Fast: Batch all messages
messages = Enum.map(1..100, fn i -> %{data: "Message #{i}"} end)
PubsubGrpc.publish(project_id, topic_id, messages)

Connection Pool Monitoring

# Check pool health
GrpcConnectionPool.status(PubsubGrpc.ConnectionPool)

# Get a channel directly (advanced)
{:ok, channel} = GrpcConnectionPool.get_channel(PubsubGrpc.ConnectionPool)

Testing

# Start emulator
docker-compose up -d

# Run tests
mix test

# Stop emulator
docker-compose down

Why gRPC?

  • 2-3x better throughput than HTTP REST API
  • 40-60% lower latency due to persistent connections
  • Efficient binary protocol (protobuf vs JSON)
  • HTTP/2 multiplexing over single connection
  • Automatic health monitoring and recovery

Advanced Configuration

Custom Connection Pool

# config/prod.exs
config :pubsub_grpc, GrpcConnectionPool,
  endpoint: [
    type: :production,
    host: "pubsub.googleapis.com",
    port: 443,
    ssl: []
  ],
  pool: [
    size: 10,
    name: PubsubGrpc.ConnectionPool
  ],
  connection: [
    keepalive: 30_000,
    ping_interval: 25_000
  ]

Using Goth for Authentication

# Add to dependencies
{:goth, "~> 1.4"}

# Configure
config :goth, json: {:system, "GOOGLE_APPLICATION_CREDENTIALS_JSON"}

# Add to supervision tree
{Goth, name: MyApp.Goth, source: {:service_account, credentials}}

Troubleshooting

Authentication Error

# Ensure you're authenticated
gcloud auth application-default login

# Or set service account
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/key.json"

Connection Issues

# Check pool status
GrpcConnectionPool.status(PubsubGrpc.ConnectionPool)
# Should show: %{status: :healthy, current_size: 5, expected_size: 5}

# Restart application if needed

Emulator Not Working

# Check if emulator is running
docker ps | grep pubsub

# Restart emulator
docker-compose down
docker-compose up -d

# Check logs
docker-compose logs -f pubsub-emulator

Contributing

  1. Fork the repository
  2. Start emulator: docker-compose up -d
  3. Run tests: mix test
  4. Make changes and ensure tests pass
  5. Submit a pull request

License

MIT