๐Ÿ™ Franz

A powerful Kafka client for Gleam
Build reliable event-driven systems with ease

Package Version Hex Docs CI

Quickstart | Examples | API Docs | Configuration


โœจ Features

๐Ÿ“ฆ Installation

Add Franz to your Gleam project:

gleam add franz

๐Ÿš€ Quickstart

Hereโ€™s a simple example to get you started with Franz:

import franz
import franz/producer
import franz/group_subscriber
import franz/message_type
import gleam/io
import gleam/erlang/process

pub fn main() {
  // Connect to Kafka
  let assert Ok(client) = 
    franz.new([franz.Endpoint("localhost", 9092)])
    |> franz.with_config(franz.AutoStartProducers(True))
    |> franz.start()
  
  // Send a message
  let assert Ok(_) = producer.produce_sync(
    client: client,
    topic: "greetings",
    partition: producer.Partition(0),
    key: <<"user:123">>,
    value: producer.Value(<<"Hello, Franz!">>, []),
  )
  
  io.println("Message sent successfully! ๐ŸŽ‰")
}

๐Ÿ“š Examples

๐ŸŽฌ Starting a Client

Franz supports multiple configuration options for connecting to your Kafka cluster:

import franz
import franz/sasl

pub fn connect_to_kafka() {
  // Simple connection
  let endpoints = [
    franz.Endpoint("broker1.example.com", 9092),
    franz.Endpoint("broker2.example.com", 9092),
  ]
  
  franz.new(endpoints)
  |> franz.with_config(franz.AutoStartProducers(True))
  |> franz.with_config(franz.ClientId("my-gleam-app"))
  |> franz.start()
}

pub fn connect_with_auth() {
  // With SASL authentication
  franz.new([franz.Endpoint("secure.broker.com", 9092)])
  |> franz.with_config(franz.Sasl(sasl.Plain("username", "password")))
  |> franz.with_config(franz.SslOptions(True, None, None))
  |> franz.start()
}

๐Ÿ“ค Producing Messages

Franz offers multiple ways to produce messages to Kafka:

Synchronous Production

import franz/producer
import gleam/bit_array

pub fn send_user_event(client: franz.Client, user_id: String, event: String) {
  // Start a producer for the topic
  let assert Ok(_) = 
    producer.new(client, "user-events")
    |> producer.with_config(producer_config.RequiredAcks(1))
    |> producer.with_config(producer_config.Compression(producer_config.Gzip))
    |> producer.start()
  
  // Send the message synchronously
  producer.produce_sync(
    client: client,
    topic: "user-events",
    partition: producer.Hash,  // Use hash partitioner based on key
    key: bit_array.from_string(user_id),
    value: producer.Value(
      bit_array.from_string(event),
      [#("event-type", "user-action")]  // Headers
    ),
  )
}

Asynchronous Production with Callback

import franz/producer

pub fn send_async_with_confirmation(client: franz.Client) {
  producer.produce(
    client: client,
    topic: "async-events",
    partition: producer.Random,  // Random partition
    key: <<"">>,
    value: producer.Value(<<"async data">>, []),
    callback: fn(partition, offset) {
      io.println("Message delivered to partition " <> int.to_string(partition))
      io.println("At offset " <> int.to_string(offset))
    },
  )
}

Custom Partitioner

import franz/producer
import gleam/string
import gleam/int

pub fn custom_partitioner_example(client: franz.Client) {
  let custom_partitioner = producer.PartitionFun(
    fn(topic, partition_count, key, _value) {
      // Custom logic: partition based on first character of key
      case bit_array.to_string(key) {
        Ok(str) -> {
          case string.first(str) {
            Ok(char) -> {
              let hash = string.to_utf_codepoints(char) |> list.first
              Ok(int.modulo(hash, partition_count))
            }
            Error(_) -> Ok(0)
          }
        }
        Error(_) -> Ok(0)
      }
    }
  )
  
  producer.produce_sync(
    client: client,
    topic: "custom-partitioned",
    partition: producer.Partitioner(custom_partitioner),
    key: <<"custom-key">>,
    value: producer.Value(<<"data">>, []),
  )
}

๐Ÿ“ฅ Consuming Messages

Franz provides two main consumer types: Group Subscribers and Topic Subscribers.

Group Subscriber (Consumer Groups)

Perfect for scalable, fault-tolerant consumption:

import franz/group_subscriber
import franz/message_type
import franz/group_config
import franz/consumer_config
import gleam/io
import gleam/json
import gleam/dynamic

pub type UserEvent {
  UserEvent(id: String, action: String, timestamp: Int)
}

pub fn start_consumer_group(client: franz.Client) {
  group_subscriber.new(
    client: client,
    group_id: "analytics-processors",
    topics: ["user-events", "system-events"],
    message_type: message_type.MessageSet,  // Receive batches
    callback: process_message_batch,
    init_callback_state: InitialState(),
  )
  |> group_subscriber.with_group_config(
    group_config.SessionTimeout(30_000)
  )
  |> group_subscriber.with_consumer_config(
    consumer_config.BeginOffset(consumer_config.Latest)
  )
  |> group_subscriber.start()
}

fn process_message_batch(messages: franz.KafkaMessage, state: State) {
  case messages {
    franz.KafkaMessageSet(topic, partition, _high_wm, messages) -> {
      io.println("Processing " <> int.to_string(list.length(messages)) 
        <> " messages from " <> topic)
      
      list.each(messages, fn(msg) {
        // Process each message
        case process_single_message(msg) {
          Ok(_) -> Nil
          Error(err) -> io.println_error("Failed to process: " <> err)
        }
      })
      
      // Commit the batch after processing
      group_subscriber.commit(state)
    }
    
    franz.KafkaMessage(..) -> {
      // Single message processing
      process_single_message(messages)
      group_subscriber.ack(state)
    }
  }
}

Topic Subscriber (Direct Subscription)

For simple, direct topic consumption:

import franz/topic_subscriber
import franz/consumer_config
import franz/partitions
import gleam/otp/task

pub fn subscribe_to_notifications(client: franz.Client) {
  // Subscribe to specific partitions
  topic_subscriber.new(
    client: client,
    topic: "notifications",
    partitions: partitions.Partitions([0, 1, 2]),
    message_type: message_type.Message,
    callback: fn(message, state) {
      let franz.KafkaMessage(offset, key, value, _, timestamp, headers) = message
      
      // Process notification
      io.println("Notification received at offset " <> int.to_string(offset))
      
      // Spawn async task for heavy processing
      task.async(fn() {
        process_notification(value)
      })
      
      Nil
    },
    init_callback_state: Nil,
  )
  |> topic_subscriber.with_consumer_config(
    consumer_config.MaxBytes(1_048_576)  // 1MB max
  )
  |> topic_subscriber.start()
}

pub fn subscribe_to_all_partitions(client: franz.Client) {
  // Subscribe to all partitions
  topic_subscriber.new(
    client: client,
    topic: "events",
    partitions: partitions.All,
    message_type: message_type.MessageSet,
    callback: handle_event_batch,
    init_callback_state: Nil,
  )
  |> topic_subscriber.start()
}

๐Ÿ”ง Advanced Configuration

Producer Configuration

import franz/producer
import franz/producer_config

pub fn configured_producer(client: franz.Client) {
  producer.new(client, "configured-topic")
  // Batching configuration
  |> producer.with_config(producer_config.BatchSize(100))
  |> producer.with_config(producer_config.LingerMs(10))
  
  // Reliability configuration  
  |> producer.with_config(producer_config.RequiredAcks(producer_config.All))
  |> producer.with_config(producer_config.MaxRetries(3))
  
  // Performance configuration
  |> producer.with_config(producer_config.Compression(producer_config.Snappy))
  |> producer.with_config(producer_config.BufferMemory(33_554_432))
  
  |> producer.start()
}

Consumer Configuration

import franz/consumer_config
import franz/group_config
import franz/isolation_level

pub fn configured_consumer() {
  group_subscriber.new(
    client: client,
    group_id: "configured-group",
    topics: ["topic1", "topic2"],
    message_type: message_type.MessageSet,
    callback: process,
    init_callback_state: Nil,
  )
  // Consumer configs
  |> group_subscriber.with_consumer_config(
    consumer_config.MinBytes(1024)
  )
  |> group_subscriber.with_consumer_config(
    consumer_config.MaxWaitTime(100)
  )
  |> group_subscriber.with_consumer_config(
    consumer_config.IsolationLevel(isolation_level.ReadCommitted)
  )
  
  // Group configs
  |> group_subscriber.with_group_config(
    group_config.ProtocolType("consumer")
  )
  |> group_subscriber.with_group_config(
    group_config.HeartbeatRate(1000)
  )
  |> group_subscriber.with_group_config(
    group_config.RejoinDelayMs(10_000)
  )
  |> group_subscriber.start()
}

๐ŸŽฏ Real-World Example: Event Sourcing System

Hereโ€™s a complete example of an event sourcing system using Franz:

import franz
import franz/producer
import franz/group_subscriber
import franz/message_type
import gleam/json
import gleam/dynamic
import gleam/result
import gleam/option.{None, Some}

// Domain events
pub type DomainEvent {
  UserCreated(id: String, name: String, email: String)
  UserUpdated(id: String, changes: List(#(String, String)))
  UserDeleted(id: String)
}

// Event store
pub fn create_event_store() {
  let assert Ok(client) = 
    franz.new([franz.Endpoint("localhost", 9092)])
    |> franz.with_config(franz.AutoStartProducers(True))
    |> franz.with_config(franz.ClientId("event-store"))
    |> franz.start()
  
  // Start the events producer
  let assert Ok(_) = 
    producer.new(client, "domain-events")
    |> producer.with_config(producer_config.RequiredAcks(producer_config.All))
    |> producer.with_config(producer_config.Compression(producer_config.Gzip))
    |> producer.start()
  
  client
}

// Publish an event
pub fn publish_event(client: franz.Client, event: DomainEvent) {
  let #(key, value) = serialize_event(event)
  
  producer.produce_sync(
    client: client,
    topic: "domain-events",
    partition: producer.Hash,
    key: key,
    value: producer.Value(
      value,
      [
        #("event-type", event_type(event)),
        #("timestamp", int.to_string(current_timestamp())),
      ]
    ),
  )
}

// Event projection
pub fn start_projection(client: franz.Client, projection_name: String) {
  group_subscriber.new(
    client: client,
    group_id: projection_name <> "-projection",
    topics: ["domain-events"],
    message_type: message_type.Message,
    callback: project_event,
    init_callback_state: ProjectionState(name: projection_name, processed: 0),
  )
  |> group_subscriber.with_consumer_config(
    consumer_config.BeginOffset(consumer_config.Earliest)
  )
  |> group_subscriber.start()
}

fn project_event(message: franz.KafkaMessage, state: ProjectionState) {
  let franz.KafkaMessage(_, key, value, _, _, headers) = message
  
  // Deserialize and process the event
  case deserialize_event(value, headers) {
    Ok(event) -> {
      case event {
        UserCreated(id, name, email) -> {
          // Update read model
          update_user_projection(id, name, email)
        }
        UserUpdated(id, changes) -> {
          apply_user_changes(id, changes)
        }
        UserDeleted(id) -> {
          remove_user_projection(id)
        }
      }
      
      // Acknowledge processing
      group_subscriber.commit(state)
    }
    Error(err) -> {
      io.println_error("Failed to deserialize event: " <> err)
      // Decide whether to skip or retry
      group_subscriber.ack(state)
    }
  }
}

// Usage
pub fn main() {
  let client = create_event_store()
  
  // Start projections
  start_projection(client, "user-read-model")
  start_projection(client, "analytics")
  
  // Publish events
  publish_event(client, UserCreated(
    id: "user-123",
    name: "Alice",
    email: "alice@example.com"
  ))
  
  // Keep the application running
  process.sleep_forever()
}

๐Ÿ” Security

SASL Authentication

Franz supports multiple SASL mechanisms:

import franz/sasl

// PLAIN authentication
franz.new(endpoints)
|> franz.with_config(franz.Sasl(sasl.Plain("username", "password")))

// SCRAM-SHA-256
franz.new(endpoints)
|> franz.with_config(franz.Sasl(sasl.ScramSha256("username", "password")))

// SCRAM-SHA-512
franz.new(endpoints)
|> franz.with_config(franz.Sasl(sasl.ScramSha512("username", "password")))

SSL/TLS Configuration

import franz

franz.new(endpoints)
|> franz.with_config(franz.SslOptions(
  verify: True,
  cacertfile: Some("/path/to/ca.crt"),
  server_name_indication: Some("kafka.example.com")
))

๐Ÿ“Š Monitoring & Observability

Franz provides detailed error types for comprehensive monitoring:

pub fn handle_kafka_result(result: Result(a, franz.FranzError)) {
  case result {
    Ok(value) -> value
    Error(error) -> {
      case error {
        franz.ProducerDown -> {
          // Handle producer failure
          restart_producer()
        }
        franz.RequestTimedOut -> {
          // Handle timeout
          retry_with_backoff()
        }
        franz.MessageTooLarge -> {
          // Handle oversized message
          split_and_retry()
        }
        _ -> {
          // Log and handle other errors
          log_error(error)
        }
      }
    }
  }
}

๐ŸŽฎ Configuration Reference

Client Configuration

OptionDescriptionDefault
AutoStartProducersAutomatically start producers for topicsFalse
ClientIdClient identifier for tracking"gleam_franz"
QueryApiVersionsQuery broker API versionsTrue
ReconnectCoolDownSecondsCooldown between reconnection attempts1

Producer Configuration

OptionDescriptionDefault
RequiredAcksNumber of acknowledgments required1
CompressionCompression algorithm (None, Gzip, Snappy, Lz4)None
BatchSizeMaximum batch size in bytes16384
LingerMsTime to wait for batching0
MaxRetriesMaximum number of retries2

Consumer Configuration

OptionDescriptionDefault
MinBytesMinimum bytes to fetch1
MaxBytesMaximum bytes to fetch1048576
MaxWaitTimeMaximum wait time in ms1000
BeginOffsetStarting offset (Latest, Earliest, offset)Latest

๐Ÿ—๏ธ Architecture

Franz is built on top of the battle-tested brod Erlang client, providing:

๐Ÿค Contributing

We welcome contributions! Please see our Contributing Guide for details.

# Run tests
gleam test

# Format code
gleam format

# Type check
gleam check

๐Ÿ“œ License

Franz is released under the MIT License. See the LICENSE file for details.

๐Ÿ™ Acknowledgments


Made with ๐Ÿ’œ by the Gleam community

Report Bug | Request Feature | Join Discord

โœจ Search Document