ExESDBGrpc

Hex.pm Documentation License: MIT

EventStore-compatible gRPC API server for ExESDB event store clusters

ExESDBGrpc provides a production-ready gRPC API that enables external clients to interact with ExESDB event stores using the EventStore gRPC protocol. Built on top of ex_esdb_gater, it offers complete stream operations, real-time subscriptions, transaction support, and comprehensive monitoring capabilities.

🌟 Features

  • 🔌 EventStore-compatible gRPC API - Drop-in replacement for EventStore gRPC clients
  • 📖 Complete stream operations - Read, write, delete, and manage event streams
  • 📡 Real-time subscriptions - Live event streaming with automatic cleanup
  • 🏥 Health monitoring - Built-in health checks and service monitoring
  • 🔄 Transaction support - Atomic operations with optimistic concurrency control
  • 📊 Comprehensive telemetry - Performance monitoring and observability
  • High performance - Built on Elixir/OTP for maximum concurrency
  • 🛡️ Production ready - Proper error handling, logging, and resource management

🏗️ Architecture

ExESDBGrpc is part of the modular ExESDB ecosystem:

        
   gRPC Clients    ex_esdb_grpc     ex_esdb_gater   
  (Any language)       (gRPC Server)        (Core Engine)   
        
  • ExESDBGrpc (this package) - EventStore-compatible gRPC API server
  • ExESDBGater - Core event store engine with clustering and persistence
  • ExESDBDashboard - LiveView web interface for monitoring and administration

🚀 Installation

Add ex_esdb_grpc to your dependencies in mix.exs:

def deps do
  [
    {:ex_esdb_grpc, "~> 0.5.0"},
    {:ex_esdb_gater, "~> 0.8.0"}  # Core event store functionality
  ]
end

📖 Quick Start

Basic Setup

Add the gRPC server to your application's supervision tree:

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      # Your existing supervisors...
      {ExESDBGrpc.Server, [port: 50_051]}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Configuration

Configure the gRPC server in your config.exs:

config :ex_esdb_grpc,
  enabled: true,      # Enable/disable gRPC server
  port: 50051        # gRPC server port

Runtime Information

Check server status programmatically:

# Check if server is running
ExESDBGrpc.server_running?()
# => true

# Get server information
ExESDBGrpc.server_info()
# => %{server_pid: #PID<0.123.0>, port: 50051, ip: {0, 0, 0, 0}}

# Get version
ExESDBGrpc.version()
# => "0.5.0"

🔧 gRPC API Reference

EventStore Service

The gRPC API provides full EventStore compatibility:

| Method | Description | Streaming | |--------|-------------|-----------|| | WriteEvents | Write events to a stream with optimistic concurrency | No | | ReadEvent | Read a single event by stream and event number | No | | ReadStreamEvents | Read multiple events from a stream with pagination | No | | ReadAllEvents | Read from the global event stream | No | | SubscribeToStream | Subscribe to stream events with real-time updates | Server streaming | | DeleteStream | Soft delete a stream | No | | GetStreamInfo | Retrieve stream metadata and statistics | No | | HealthCheck | Service health monitoring | No |

Transaction Support

MethodDescription
StartTransactionBegin a new transaction
WriteToTransactionWrite events to an open transaction
CommitTransactionCommit a transaction atomically

🌐 Client Examples

C# (.NET)

using EventStore.Client;

// Connect to ExESDBGrpc server
var settings = EventStoreClientSettings.Create("esdb://localhost:50051?tls=false");
var client = new EventStoreClient(settings);

// Write events
var eventData = new EventData(
    Uuid.NewUuid(), 
    "UserRegistered", 
    JsonSerializer.SerializeToUtf8Bytes(new { UserId = 123, Email = "user@example.com" })
);

await client.AppendToStreamAsync("user-123", StreamState.Any, new[] { eventData });

// Read events
var events = client.ReadStreamAsync(Direction.Forwards, "user-123", StreamPosition.Start);
await foreach (var @event in events)
{
    Console.WriteLine($"Event: {@event.Event.EventType}");
}

// Subscribe to stream
using var subscription = client.SubscribeToStream("user-123", FromStream.Start);
await foreach (var message in subscription)
{
    Console.WriteLine($"Received: {message.Event.EventType}");
}

Python

import grpc
import json
from eventstore_pb2_grpc import EventStoreStub
from eventstore_pb2 import *

# Connect to ExESDBGrpc server
channel = grpc.insecure_channel('localhost:50051')
client = EventStoreStub(channel)

# Write events
write_request = WriteEvents(
    event_stream_id="user-123",
    expected_version=-2,  # Any version
    events=[
        NewEvent(
            event_id=b"unique-event-id",
            event_type="UserRegistered",
            data=json.dumps({"user_id": 123, "email": "user@example.com"}).encode(),
            metadata=b"{}"
        )
    ]
)

response = client.WriteEvents(write_request)
print(f"Events written: {response.result}")

# Read events
read_request = ReadStreamEvents(
    event_stream_id="user-123",
    from_event_number=0,
    max_count=100
)

response = client.ReadStreamEvents(read_request)
for event in response.events:
    print(f"Event: {event.event.event_type}")

# Subscribe to stream
subscribe_request = SubscribeToStream(event_stream_id="user-123")
for event in client.SubscribeToStream(subscribe_request):
    print(f"Received: {event.event.event.event_type}")

Node.js

const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');

// Load EventStore proto definitions
const packageDefinition = protoLoader.loadSync('EventStore.proto');
const eventstore = grpc.loadPackageDefinition(packageDefinition).eventstore;

// Connect to ExESDBGrpc server
const client = new eventstore.EventStore('localhost:50051', grpc.credentials.createInsecure());

// Write events
const writeRequest = {
  event_stream_id: 'user-123',
  expected_version: -2,
  events: [{
    event_id: Buffer.from('unique-event-id'),
    event_type: 'UserRegistered',
    data: Buffer.from(JSON.stringify({ user_id: 123, email: 'user@example.com' })),
    metadata: Buffer.from('{}')
  }]
};

client.WriteEvents(writeRequest, (error, response) => {
  if (error) {
    console.error('Error:', error);
  } else {
    console.log('Events written:', response.result);
  }
});

// Subscribe to stream
const subscription = client.SubscribeToStream({ event_stream_id: 'user-123' });
subscription.on('data', (event) => {
  console.log('Received:', event.event.event.event_type);
});

📊 Monitoring and Observability

ExESDBGrpc includes comprehensive telemetry and monitoring:

Telemetry Events

The package emits detailed telemetry events for monitoring:

# Server lifecycle
[:ex_esdb_grpc, :server, :started]
[:ex_esdb_grpc, :server, :stopped]

# Client connections
[:ex_esdb_grpc, :client, :connected]
[:ex_esdb_grpc, :client, :disconnected]

# RPC calls
[:ex_esdb_grpc, :rpc, :call, :start]
[:ex_esdb_grpc, :rpc, :call, :stop]
[:ex_esdb_grpc, :rpc, :call, :error]

# Stream operations
[:ex_esdb_grpc, :stream, :subscription, :start]
[:ex_esdb_grpc, :stream, :subscription, :stop]
[:ex_esdb_grpc, :event, :read, :start]
[:ex_esdb_grpc, :event, :write, :stop]

Health Monitoring

# Get comprehensive metrics
metrics = ExESDBGrpc.Telemetry.get_metrics()

# Get health status
health = ExESDBGrpc.Telemetry.get_health()

# Reset metrics (useful for testing)
ExESDBGrpc.Telemetry.reset_metrics()

Custom Event Emission

# Emit custom telemetry events
ExESDBGrpc.Telemetry.emit(:custom_event, %{
  client_id: "my-client",
  operation: "bulk_write",
  event_count: 1000
})

🔧 Development

Prerequisites

  • Elixir 1.14 or later
  • Erlang/OTP 25 or later

Setup

# Clone the repository
git clone https://github.com/beam-campus/ex-esdb-grpc.git
cd ex-esdb-grpc/package

# Get dependencies
mix deps.get

# Compile the project
mix compile

# Run tests
mix test

# Generate documentation
mix docs

# Start interactive session
iex -S mix

Testing gRPC Services

# Install grpcurl for testing
go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest

# List available services
grpcurl -plaintext localhost:50051 list

# Call health check
grpcurl -plaintext -d '{"service":""}' \
  localhost:50051 grpc.health.v1.Health/Check

# Test event writing (requires proper protobuf definitions)
grpcurl -plaintext -d '{
  "event_stream_id": "test-stream",
  "expected_version": -2,
  "events": [
    {
      "event_id": "dGVzdC1ldmVudC0x",
      "event_type": "TestEvent",
      "data": "eyJ0ZXN0IjogInZhbHVlIn0=",
      "metadata": "e30="
    }
  ]
}' localhost:50051 eventstore.EventStore/WriteEvents

Code Quality

The project uses several tools to maintain code quality:

# Code formatting
mix format

# Static analysis
mix credo

# Type checking (if using Dialyzer)
mix dialyzer

# Documentation coverage
mix inch

🤝 Contributing

We welcome contributions! Please follow these steps:

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Follow our coding standards:
    • Use idiomatic Elixir with pattern matching
    • Favor multiple function clauses over case statements
    • Write comprehensive tests
    • Include documentation for public APIs
  4. Test your changes (mix test)
  5. Commit your changes (git commit -am 'Add amazing feature')
  6. Push to your branch (git push origin feature/amazing-feature)
  7. Open a Pull Request

Code Style

This project follows idiomatic Elixir patterns:

  • Multiple function clauses with pattern matching over case statements
  • Comprehensive error handling with proper pattern matching
  • Minimal use of try/rescue constructs
  • Clear separation of concerns with focused modules

📝 Changelog

See CHANGELOG.md for detailed release history.

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

🙏 Acknowledgments

  • EventStore team for the excellent gRPC protocol specification
  • grpc-elixir for the robust Elixir gRPC implementation
  • The Elixir community for creating an amazing ecosystem

📞 Support


Built with ❤️ by the Beam Campus team