glixir/syn

Distributed process coordination using Erlang’s syn library.

Provides type-safe wrappers for syn’s registry and PubSub functionality, enabling distributed service discovery and event streaming across BEAM nodes.

Inspired by the excellent glyn project’s syn wrapper patterns. Check out glyn for more advanced OTP patterns: https://github.com/glyn-project/glyn

This implementation is optimized for glixir’s type-safe OTP wrappers.

Types

PubSub operation errors

pub type PubSubError {
  JoinFailed(String)
  LeaveFailed(String)
  PublishFailed(String)
}

Constructors

  • JoinFailed(String)
  • LeaveFailed(String)
  • PublishFailed(String)

Registry operation errors

pub type RegistryError {
  RegistrationFailed(String)
  LookupFailed(String)
  UnregistrationFailed(String)
}

Constructors

  • RegistrationFailed(String)
  • LookupFailed(String)
  • UnregistrationFailed(String)

Values

pub fn broadcast_status(
  group: String,
  status_message: json.Json,
) -> Result(Int, PubSubError)

Broadcast a status update to all processes in a coordination group. Common pattern for distributed consensus or health monitoring.

Example

let status = json.object([
  #("node", json.string(node_name)),
  #("load", json.float(cpu_load)),
  #("timestamp", json.int(now))
])

syn.broadcast_status("health_check", status)
pub fn find_worker(
  pool_name: String,
  worker_id: String,
) -> Result(#(process.Pid, load_info), RegistryError)

Find a worker in a pool and get its load information.

Example

case syn.find_worker("image_processors", worker_id) {
  Ok(#(pid, #(cpu_usage, queue_size))) -> // Send work to worker
  Error(_) -> // Worker not available
}
pub fn init_scopes(scopes: List(String)) -> Nil

Initialize scopes for distributed coordination. Call this once at application startup for each scope you plan to use.

Example

syn.init_scopes(["user_sessions", "game_lobbies", "worker_pool"])
pub fn join(
  scope: String,
  group: String,
) -> Result(Nil, PubSubError)

Join a PubSub group to receive published messages. The current process will receive all messages published to this group.

Example

syn.join("chat_rooms", "general")
// Process will now receive all messages published to "general" chat
pub fn join_coordination(
  group: String,
) -> Result(Nil, PubSubError)

Join a coordination group for distributed algorithm participation. Common pattern for consensus, leader election, or load balancing.

Example

syn.join_coordination("leader_election")
// Process will receive leader election messages
pub fn leave(
  scope: String,
  group: String,
) -> Result(Nil, PubSubError)

Leave a PubSub group to stop receiving messages. The current process will no longer receive messages published to this group.

Example

syn.leave("chat_rooms", "general")
pub fn leave_coordination(
  group: String,
) -> Result(Nil, PubSubError)

Leave a coordination group.

Example

syn.leave_coordination("leader_election") 
pub fn member_count(scope: String, group: String) -> Int

Get count of processes subscribed to a group. More efficient than getting the full member list if you only need the count.

Example

let user_count = syn.member_count("chat_rooms", "general")
if user_count > 100 {
  // Maybe split the room
}
pub fn members(scope: String, group: String) -> List(process.Pid)

Get list of all process PIDs subscribed to a group. Useful for debugging or monitoring subscriber counts.

Example

let active_users = syn.members("user_sessions", "online")
io.println("Active users: " <> string.inspect(list.length(active_users)))
pub fn publish(
  scope: String,
  group: String,
  message: String,
) -> Result(Int, PubSubError)

Publish a string message to all members of a PubSub group. Returns the number of processes the message was delivered to.

Example

case syn.publish("chat_rooms", "general", "Hello everyone!") {
  Ok(count) -> // Message sent to `count` processes
  Error(_) -> // Publish failed
}
pub fn publish_json(
  scope: String,
  group: String,
  message: message,
  encoder: fn(message) -> json.Json,
) -> Result(Int, PubSubError)

Publish a typed message using JSON encoding. The message will be JSON-encoded before sending to subscribers.

Example

let user_message = json.object([
  #("user", json.string("alice")),
  #("content", json.string("Hello!")),
  #("timestamp", json.int(system_time))
])

syn.publish_json("chat_rooms", "general", user_message, fn(j) { j })
pub fn register(
  scope: String,
  name: String,
  metadata: metadata,
) -> Result(Nil, RegistryError)

Register the current process in a scope with metadata. The process can be looked up later by other nodes using the scope and name.

Example

// Register a user session
syn.register("user_sessions", user_id, #(username, last_active))
pub fn register_worker(
  pool_name: String,
  worker_id: String,
  load_info: load_info,
) -> Result(Nil, RegistryError)

Register a worker process with load information. Common pattern for distributed worker pools.

Example

syn.register_worker("image_processors", worker_id, #(cpu_usage, queue_size))
pub fn unregister(
  scope: String,
  name: String,
) -> Result(Nil, RegistryError)

Unregister a process by scope and name. The process will no longer be discoverable by other nodes.

Example

syn.unregister("user_sessions", user_id)
pub fn whereis(
  scope: String,
  name: String,
) -> Result(#(process.Pid, metadata), RegistryError)

Look up a registered process by scope and name. Returns the process PID and its metadata if found.

Example

case syn.whereis("user_sessions", user_id) {
  Ok(#(pid, #(username, last_active))) -> // Send message to user
  Error(_) -> // User not online
}
Search Document