PhoenixGenApi (PhoenixGenApi v2.11.0)

Copy Markdown View Source

PhoenixGenApi is a framework for building distributed API systems with Phoenix.

This library provides a comprehensive solution for handling API requests with support for multiple execution modes (sync, async, streaming), distributed node selection, permission checking, and automatic argument validation.

Features

  • Multiple Execution Modes: Support for synchronous, asynchronous, streaming, and fire-and-forget requests
  • Distributed Execution: Execute functions on remote nodes with automatic node selection
  • Node Selection Strategies: Random, hash-based, round-robin, and custom selection strategies
  • Automatic Argument Validation: Type checking and conversion for request arguments
  • Permission Control: Built-in permission checking for requests
  • Streaming Support: Handle long-running operations with streaming responses
  • Configuration Caching: Efficient caching of function configurations with automatic updates
  • Configuration Push: Remote nodes can actively push their service and function configs to the gateway
  • Rate Limiting: Global and per-API rate limiting with sliding window algorithm

Architecture

The library consists of several key components:

Usage Example

Basic Setup

First, define your function configurations:

config = %PhoenixGenApi.Structs.FunConfig{
  request_type: "get_user",
  service: "user_service",
  nodes: ["user@node1", "user@node2"],
  choose_node_mode: :random,
  timeout: 5000,
  mfa: {UserService, :get_user, []},
  arg_types: %{"user_id" => :string},
  arg_orders: ["user_id"],
  response_type: :sync,
  check_permission: {:arg, "user_id"},
  request_info: false
}

# Add configuration to cache
PhoenixGenApi.ConfigDb.add(config)

Execute Requests

use PhoenixGenApi

# Create a request
request = %PhoenixGenApi.Structs.Request{
  request_id: "req_123",
  request_type: "get_user",
  user_id: "user_456",
  device_id: "device_789",
  args: %{"user_id" => "user_123"}
}

# Execute the request
response = PhoenixGenApi.Executor.execute!(request)

Streaming Requests

For long-running operations, use streaming mode:

stream_config = %PhoenixGenApi.Structs.FunConfig{
  request_type: "process_data",
  service: "processing_service",
  nodes: :local,
  choose_node_mode: :random,
  timeout: :infinity,
  mfa: {DataProcessor, :process_large_dataset, []},
  arg_types: %{"dataset_id" => :string},
  arg_orders: ["dataset_id"],
  response_type: :stream,
  check_permission: false,
  request_info: true
}

# The streaming function should send results using StreamHelper:
# StreamHelper.send_result(stream, chunk_data)
# StreamHelper.send_last_result(stream, final_data)
# Or: StreamHelper.send_complete(stream)

Configuration

Add to your config.exs:

config :phoenix_gen_api, :gen_api,
  pull_timeout: 5_000,
  pull_interval: 30_000,
  detail_error: false,
  service_configs: [
    %{
      service: "user_service",
      nodes: ["user@node1", "user@node2"],
      module: "UserService",
      function: "get_config",
      args: []
    }
  ]

Rate Limiting Configuration

Configure rate limits in your config.exs:

config :phoenix_gen_api, :rate_limiter,
  enabled: true,
  fail_open: true,
  global_limits: [
    %{key: :user_id, max_requests: 2000, window_ms: 60_000},
    %{key: :device_id, max_requests: 10000, window_ms: 60_000}
  ],
  api_limits: [
    %{
      service: "data_service",
      request_type: "export_data",
      key: :user_id,
      max_requests: 10,
      window_ms: 60_000
    }
  ]

Learn More

For detailed information about specific components, see:

Summary

Functions

Adds a single global rate limit at runtime.

Attaches a telemetry handler to all PhoenixGenApi events.

[Shell Helper] Quick view of ConfigDb cache status.

Checks rate limit for a request.

Detaches all telemetry handlers with the given ID.

Gets the current global rate limits (may differ from config.exs if changed at runtime).

Gets all configured rate limits.

Gets current rate limit status for a key.

[Shell Helper] Quick view of Worker Pool status.

Pushes a PushConfig to this server node.

[Shell Helper] Quick view of pushed services status.

Removes a global rate limit by key at runtime.

Resets rate limit counters for a specific key.

[Shell Helper] Quick view of current rate limit configuration.

[Shell Helper] Quick view and management of global rate limits.

[Shell Helper] Quick view of rate limit status for a user.

Sets (replaces) all global rate limits at runtime.

Stops an active streaming call.

Updates rate limit configuration at runtime.

Verifies that the server has the given service and config version.

Functions

add_global_limit(limit)

@spec add_global_limit(map()) :: :ok

Adds a single global rate limit at runtime.

If a limit with the same :key already exists, it will be replaced.

Parameters

  • limit - A map with :key, :max_requests, and :window_ms

Returns

  • :ok - Limit was added

Examples

PhoenixGenApi.add_global_limit(%{
  key: :ip_address,
  max_requests: 100,
  window_ms: 60_000
})

attach_telemetry(handler_id, function, config \\ %{})

@spec attach_telemetry(String.t(), function(), map()) :: :ok

Attaches a telemetry handler to all PhoenixGenApi events.

This is a convenience function that attaches handlers to both executor and rate limiter events with a single call.

Events

Executor Events

  • [:phoenix_gen_api, :executor, :request, :start]
  • [:phoenix_gen_api, :executor, :request, :stop]
  • [:phoenix_gen_api, :executor, :request, :exception]

Rate Limiter Events

  • [:phoenix_gen_api, :rate_limiter, :check]
  • [:phoenix_gen_api, :rate_limiter, :exceeded]
  • [:phoenix_gen_api, :rate_limiter, :reset]
  • [:phoenix_gen_api, :rate_limiter, :cleanup]

Parameters

  • handler_id - A unique string identifier for the handler
  • function - A 4-arity function: fn(event, measurements, metadata, config) -> any end
  • config - Optional configuration map passed to the handler (default: %{})

Examples

# Attach a simple logging handler
PhoenixGenApi.attach_telemetry("my-app", fn event, measurements, metadata, _config ->
  ...
end)

# Attach with custom config
PhoenixGenApi.attach_telemetry("metrics", &MyApp.Metrics.handle_event/4, %{prefix: "phoenix_gen_api"})

cache_status()

[Shell Helper] Quick view of ConfigDb cache status.

Usage in IEx

iex> PhoenixGenApi.cache_status()

check_rate_limit(request)

@spec check_rate_limit(PhoenixGenApi.Structs.Request.t()) ::
  :ok | {:error, :rate_limited, map()}

Checks rate limit for a request.

This function checks both global and per-API rate limits. It is automatically called during request execution, but can also be called manually for custom rate limiting logic.

Parameters

  • request - The Request struct to check

Returns

  • :ok - Request is within all rate limits
  • {:error, :rate_limited, details} - Request exceeds a rate limit

Examples

request = %Request{user_id: "user_123", service: "my_service", request_type: "my_api"}

case PhoenixGenApi.check_rate_limit(request) do
  :ok ->
    # Proceed with execution

  {:error, :rate_limited, details} ->
    # Handle rate limit exceeded
end

detach_telemetry(handler_id)

@spec detach_telemetry(String.t()) :: :ok

Detaches all telemetry handlers with the given ID.

Parameters

  • handler_id - The handler ID used when attaching

Examples

PhoenixGenApi.detach_telemetry("my-app")

get_global_limits()

@spec get_global_limits() :: [map()]

Gets the current global rate limits (may differ from config.exs if changed at runtime).

Returns

A list of global rate limit maps.

Examples

PhoenixGenApi.get_global_limits()
# => [%{key: :user_id, max_requests: 2000, window_ms: 60_000}]

get_rate_limit_config()

@spec get_rate_limit_config() :: %{global: list(), api: list()}

Gets all configured rate limits.

Returns

A map with :global and :api keys containing the configured limits.

get_rate_limit_status(key_value, scope, rate_limit_key)

@spec get_rate_limit_status(
  String.t(),
  :global | {String.t() | atom(), String.t()},
  atom() | String.t()
) :: [map()]

Gets current rate limit status for a key.

Returns

A list of maps with current usage information for all applicable rate limits.

pool_status()

[Shell Helper] Quick view of Worker Pool status.

Usage in IEx

iex> PhoenixGenApi.pool_status()

push_config(push_config, opts \\ [])

@spec push_config(
  PhoenixGenApi.Structs.PushConfig.t() | map(),
  keyword()
) :: {:ok, :accepted} | {:ok, :skipped, term()} | {:error, term()}

Pushes a PushConfig to this server node.

This is the server-side API for receiving pushed configs from remote nodes. Remote nodes should use ConfigPusher.push/2 or ConfigPusher.push_on_startup/3 instead, which make RPC calls to this function.

Parameters

  • push_config - A %PushConfig{} struct or map that can be decoded into one
  • opts - Options keyword list:
    • :force - Force push even if version matches (default: false)

Returns

  • {:ok, :accepted} - New configs were stored successfully
  • {:ok, :skipped, reason} - Push was skipped (e.g., version matches)
  • {:error, reason} - Push failed (validation error, etc.)

Examples

alias PhoenixGenApi.Structs.PushConfig

push_config = %PushConfig{
  service: "my_service",
  nodes: [:"node1@host"],
  config_version: "1.0.0",
  fun_configs: [%FunConfig{...}]
}

{:ok, :accepted} = PhoenixGenApi.push_config(push_config)
{:ok, :skipped, :version_matches} = PhoenixGenApi.push_config(push_config)
{:ok, :accepted} = PhoenixGenApi.push_config(push_config, force: true)

pushed_services_status()

[Shell Helper] Quick view of pushed services status.

Shows all services that have been registered via push, along with their config versions and auto-pull registration status.

Usage in IEx

iex> PhoenixGenApi.pushed_services_status()

remove_global_limit(key)

@spec remove_global_limit(atom() | String.t()) :: :ok

Removes a global rate limit by key at runtime.

Parameters

  • key - The rate limit key to remove (:user_id, :device_id, etc.)

Returns

  • :ok - Limit was removed (or didn't exist)

Examples

PhoenixGenApi.remove_global_limit(:ip_address)

reset_rate_limit(key_value, scope, rate_limit_key)

@spec reset_rate_limit(
  String.t(),
  :global | {String.t() | atom(), String.t()},
  atom() | String.t()
) :: :ok

Resets rate limit counters for a specific key.

Parameters

  • key_value - The key value to reset (e.g., user ID)
  • scope - Either :global or {service, request_type} tuple
  • rate_limit_key - The type of key (:user_id, :device_id, etc.)

Returns

  • :ok - Counters were reset

Examples

# Reset all rate limits for a user
PhoenixGenApi.reset_rate_limit("user_123", :global, :user_id)

# Reset API-specific rate limit
PhoenixGenApi.reset_rate_limit("user_123", {"my_service", "my_api"}, :user_id)

rl_config()

[Shell Helper] Quick view of current rate limit configuration.

Usage in IEx

iex> PhoenixGenApi.rl_config()

rl_global()

[Shell Helper] Quick view and management of global rate limits.

Usage in IEx

# View current global limits
iex> PhoenixGenApi.rl_global()

# Set new global limits
iex> PhoenixGenApi.rl_global([%{key: :user_id, max_requests: 2000, window_ms: 60_000}])

# Add a single limit
iex> PhoenixGenApi.rl_global(:add, %{key: :ip_address, max_requests: 100, window_ms: 60_000})

# Remove a limit by key
iex> PhoenixGenApi.rl_global(:remove, :ip_address)

rl_global(limits)

rl_global(atom, limit)

rl_status(user_id)

[Shell Helper] Quick view of rate limit status for a user.

Usage in IEx

iex> PhoenixGenApi.rl_status("user_123")

set_global_limits(limits)

@spec set_global_limits([map()]) :: :ok

Sets (replaces) all global rate limits at runtime.

Parameters

  • limits - A list of global rate limit maps, each with:
    • :key - The rate limit key (:user_id, :device_id, :ip_address, or custom string)
    • :max_requests - Maximum requests allowed in the window
    • :window_ms - Window duration in milliseconds

Returns

  • :ok - Limits were updated

Examples

PhoenixGenApi.set_global_limits([
  %{key: :user_id, max_requests: 2000, window_ms: 60_000},
  %{key: :device_id, max_requests: 10000, window_ms: 60_000}
])

stop_stream(request_id)

@spec stop_stream(pid()) :: :ok

Stops an active streaming call.

This function gracefully terminates a streaming call process and sends a completion message to the receiver. The stream call process is identified by its PID.

Parameters

  • stream_pid - The PID of the streaming call process to stop

Returns

  • :ok - The stop signal was sent successfully

Examples

# Start a stream
{:ok, stream_pid} = StreamCall.start_link(%{
  request: request,
  fun_config: config,
  receiver: self()
})

# Later, stop the stream
PhoenixGenApi.stop_stream(stream_pid)

# Receive the completion message
receive do
  {:stream_response, response} ->
    assert response.has_more == false
end

Notes

  • The stream call will send a completion response to its receiver before terminating
  • This does not notify the data generator process; it only stops the stream relay
  • If you need to stop the data generation itself, handle that in your generator function

update_rate_limit_config(config)

@spec update_rate_limit_config(map()) :: :ok

Updates rate limit configuration at runtime.

Parameters

  • config - A map with :global_limits and/or :api_limits keys

Returns

  • :ok - Configuration was updated

Examples

PhoenixGenApi.update_rate_limit_config(%{
  global_limits: [
    %{key: :user_id, max_requests: 2000, window_ms: 60_000}
  ]
})

verify_config(service, config_version)

@spec verify_config(String.t() | atom(), String.t()) ::
  {:ok, :matched} | {:ok, :mismatch, String.t()} | {:error, :not_found}

Verifies that the server has the given service and config version.

Useful for checking whether a push is necessary before sending the full configuration. Remote nodes should use ConfigPusher.verify/3 instead.

Parameters

  • service - The service name (string or atom)
  • config_version - The config version string to verify

Returns

  • {:ok, :matched} - Version matches what is stored
  • {:ok, :mismatch, stored_version} - Version differs from what is stored
  • {:error, :not_found} - Service is not known

Examples

{:ok, :matched} = PhoenixGenApi.verify_config("my_service", "1.0.0")
{:ok, :mismatch, "0.9.0"} = PhoenixGenApi.verify_config("my_service", "1.0.0")
{:error, :not_found} = PhoenixGenApi.verify_config("unknown_service", "1.0.0")