PhoenixGenApi is a framework for building distributed API systems with Phoenix.
This library provides a comprehensive solution for handling API requests with support for multiple execution modes (sync, async, streaming), distributed node selection, permission checking, and automatic argument validation.
Features
- Multiple Execution Modes: Support for synchronous, asynchronous, streaming, and fire-and-forget requests
- Distributed Execution: Execute functions on remote nodes with automatic node selection
- Node Selection Strategies: Random, hash-based, round-robin, and custom selection strategies
- Automatic Argument Validation: Type checking and conversion for request arguments
- Permission Control: Built-in permission checking for requests
- Streaming Support: Handle long-running operations with streaming responses
- Configuration Caching: Efficient caching of function configurations with automatic updates
- Configuration Push: Remote nodes can actively push their service and function configs to the gateway
- Rate Limiting: Global and per-API rate limiting with sliding window algorithm
Architecture
The library consists of several key components:
PhoenixGenApi.Executor- Core execution engine for processing requestsPhoenixGenApi.ConfigDb- Caches function configurations for fast lookupPhoenixGenApi.ConfigPuller- Pulls and updates configurations from remote servicesPhoenixGenApi.ConfigReceiver- Receives pushed configurations from remote nodes (server-side)PhoenixGenApi.ConfigPusher- Pushes configurations to the gateway node (client-side)PhoenixGenApi.NodeSelector- Selects target nodes based on configured strategiesPhoenixGenApi.Permission- Handles permission checking for requestsPhoenixGenApi.ArgumentHandler- Validates and converts request argumentsPhoenixGenApi.StreamCall- Manages streaming function callsPhoenixGenApi.RateLimiter- Rate limiting for global and per-API requests
Usage Example
Basic Setup
First, define your function configurations:
config = %PhoenixGenApi.Structs.FunConfig{
request_type: "get_user",
service: "user_service",
nodes: ["user@node1", "user@node2"],
choose_node_mode: :random,
timeout: 5000,
mfa: {UserService, :get_user, []},
arg_types: %{"user_id" => :string},
arg_orders: ["user_id"],
response_type: :sync,
check_permission: {:arg, "user_id"},
request_info: false
}
# Add configuration to cache
PhoenixGenApi.ConfigDb.add(config)Execute Requests
use PhoenixGenApi
# Create a request
request = %PhoenixGenApi.Structs.Request{
request_id: "req_123",
request_type: "get_user",
user_id: "user_456",
device_id: "device_789",
args: %{"user_id" => "user_123"}
}
# Execute the request
response = PhoenixGenApi.Executor.execute!(request)Streaming Requests
For long-running operations, use streaming mode:
stream_config = %PhoenixGenApi.Structs.FunConfig{
request_type: "process_data",
service: "processing_service",
nodes: :local,
choose_node_mode: :random,
timeout: :infinity,
mfa: {DataProcessor, :process_large_dataset, []},
arg_types: %{"dataset_id" => :string},
arg_orders: ["dataset_id"],
response_type: :stream,
check_permission: false,
request_info: true
}
# The streaming function should send results using StreamHelper:
# StreamHelper.send_result(stream, chunk_data)
# StreamHelper.send_last_result(stream, final_data)
# Or: StreamHelper.send_complete(stream)Configuration
Add to your config.exs:
config :phoenix_gen_api, :gen_api,
pull_timeout: 5_000,
pull_interval: 30_000,
detail_error: false,
service_configs: [
%{
service: "user_service",
nodes: ["user@node1", "user@node2"],
module: "UserService",
function: "get_config",
args: []
}
]Rate Limiting Configuration
Configure rate limits in your config.exs:
config :phoenix_gen_api, :rate_limiter,
enabled: true,
fail_open: true,
global_limits: [
%{key: :user_id, max_requests: 2000, window_ms: 60_000},
%{key: :device_id, max_requests: 10000, window_ms: 60_000}
],
api_limits: [
%{
service: "data_service",
request_type: "export_data",
key: :user_id,
max_requests: 10,
window_ms: 60_000
}
]Learn More
For detailed information about specific components, see:
PhoenixGenApi.Executor- Request executionPhoenixGenApi.Structs.FunConfig- Function configurationPhoenixGenApi.Structs.Request- Request structurePhoenixGenApi.Structs.Response- Response structurePhoenixGenApi.NodeSelector- Node selection strategies
Summary
Functions
Adds a single global rate limit at runtime.
Attaches a telemetry handler to all PhoenixGenApi events.
[Shell Helper] Quick view of ConfigDb cache status.
Checks rate limit for a request.
Detaches all telemetry handlers with the given ID.
Gets the current global rate limits (may differ from config.exs if changed at runtime).
Gets all configured rate limits.
Gets current rate limit status for a key.
[Shell Helper] Quick view of Worker Pool status.
Pushes a PushConfig to this server node.
[Shell Helper] Quick view of pushed services status.
Removes a global rate limit by key at runtime.
Resets rate limit counters for a specific key.
[Shell Helper] Quick view of current rate limit configuration.
[Shell Helper] Quick view and management of global rate limits.
[Shell Helper] Quick view of rate limit status for a user.
Sets (replaces) all global rate limits at runtime.
Stops an active streaming call.
Updates rate limit configuration at runtime.
Verifies that the server has the given service and config version.
Functions
@spec add_global_limit(map()) :: :ok
Adds a single global rate limit at runtime.
If a limit with the same :key already exists, it will be replaced.
Parameters
limit- A map with:key,:max_requests, and:window_ms
Returns
:ok- Limit was added
Examples
PhoenixGenApi.add_global_limit(%{
key: :ip_address,
max_requests: 100,
window_ms: 60_000
})
Attaches a telemetry handler to all PhoenixGenApi events.
This is a convenience function that attaches handlers to both executor and rate limiter events with a single call.
Events
Executor Events
[:phoenix_gen_api, :executor, :request, :start][:phoenix_gen_api, :executor, :request, :stop][:phoenix_gen_api, :executor, :request, :exception]
Rate Limiter Events
[:phoenix_gen_api, :rate_limiter, :check][:phoenix_gen_api, :rate_limiter, :exceeded][:phoenix_gen_api, :rate_limiter, :reset][:phoenix_gen_api, :rate_limiter, :cleanup]
Parameters
handler_id- A unique string identifier for the handlerfunction- A 4-arity function: fn(event, measurements, metadata, config) -> any endconfig- Optional configuration map passed to the handler (default: %{})
Examples
# Attach a simple logging handler
PhoenixGenApi.attach_telemetry("my-app", fn event, measurements, metadata, _config ->
...
end)
# Attach with custom config
PhoenixGenApi.attach_telemetry("metrics", &MyApp.Metrics.handle_event/4, %{prefix: "phoenix_gen_api"})
[Shell Helper] Quick view of ConfigDb cache status.
Usage in IEx
iex> PhoenixGenApi.cache_status()
@spec check_rate_limit(PhoenixGenApi.Structs.Request.t()) :: :ok | {:error, :rate_limited, map()}
Checks rate limit for a request.
This function checks both global and per-API rate limits. It is automatically called during request execution, but can also be called manually for custom rate limiting logic.
Parameters
request- TheRequeststruct to check
Returns
:ok- Request is within all rate limits{:error, :rate_limited, details}- Request exceeds a rate limit
Examples
request = %Request{user_id: "user_123", service: "my_service", request_type: "my_api"}
case PhoenixGenApi.check_rate_limit(request) do
:ok ->
# Proceed with execution
{:error, :rate_limited, details} ->
# Handle rate limit exceeded
end
@spec detach_telemetry(String.t()) :: :ok
Detaches all telemetry handlers with the given ID.
Parameters
handler_id- The handler ID used when attaching
Examples
PhoenixGenApi.detach_telemetry("my-app")
@spec get_global_limits() :: [map()]
Gets the current global rate limits (may differ from config.exs if changed at runtime).
Returns
A list of global rate limit maps.
Examples
PhoenixGenApi.get_global_limits()
# => [%{key: :user_id, max_requests: 2000, window_ms: 60_000}]
Gets all configured rate limits.
Returns
A map with :global and :api keys containing the configured limits.
@spec get_rate_limit_status( String.t(), :global | {String.t() | atom(), String.t()}, atom() | String.t() ) :: [map()]
Gets current rate limit status for a key.
Returns
A list of maps with current usage information for all applicable rate limits.
[Shell Helper] Quick view of Worker Pool status.
Usage in IEx
iex> PhoenixGenApi.pool_status()
@spec push_config( PhoenixGenApi.Structs.PushConfig.t() | map(), keyword() ) :: {:ok, :accepted} | {:ok, :skipped, term()} | {:error, term()}
Pushes a PushConfig to this server node.
This is the server-side API for receiving pushed configs from remote nodes.
Remote nodes should use ConfigPusher.push/2 or ConfigPusher.push_on_startup/3
instead, which make RPC calls to this function.
Parameters
push_config- A%PushConfig{}struct or map that can be decoded into oneopts- Options keyword list::force- Force push even if version matches (default:false)
Returns
{:ok, :accepted}- New configs were stored successfully{:ok, :skipped, reason}- Push was skipped (e.g., version matches){:error, reason}- Push failed (validation error, etc.)
Examples
alias PhoenixGenApi.Structs.PushConfig
push_config = %PushConfig{
service: "my_service",
nodes: [:"node1@host"],
config_version: "1.0.0",
fun_configs: [%FunConfig{...}]
}
{:ok, :accepted} = PhoenixGenApi.push_config(push_config)
{:ok, :skipped, :version_matches} = PhoenixGenApi.push_config(push_config)
{:ok, :accepted} = PhoenixGenApi.push_config(push_config, force: true)
[Shell Helper] Quick view of pushed services status.
Shows all services that have been registered via push, along with their config versions and auto-pull registration status.
Usage in IEx
iex> PhoenixGenApi.pushed_services_status()
Removes a global rate limit by key at runtime.
Parameters
key- The rate limit key to remove (:user_id,:device_id, etc.)
Returns
:ok- Limit was removed (or didn't exist)
Examples
PhoenixGenApi.remove_global_limit(:ip_address)
@spec reset_rate_limit( String.t(), :global | {String.t() | atom(), String.t()}, atom() | String.t() ) :: :ok
Resets rate limit counters for a specific key.
Parameters
key_value- The key value to reset (e.g., user ID)scope- Either:globalor{service, request_type}tuplerate_limit_key- The type of key (:user_id,:device_id, etc.)
Returns
:ok- Counters were reset
Examples
# Reset all rate limits for a user
PhoenixGenApi.reset_rate_limit("user_123", :global, :user_id)
# Reset API-specific rate limit
PhoenixGenApi.reset_rate_limit("user_123", {"my_service", "my_api"}, :user_id)
[Shell Helper] Quick view of current rate limit configuration.
Usage in IEx
iex> PhoenixGenApi.rl_config()
[Shell Helper] Quick view and management of global rate limits.
Usage in IEx
# View current global limits
iex> PhoenixGenApi.rl_global()
# Set new global limits
iex> PhoenixGenApi.rl_global([%{key: :user_id, max_requests: 2000, window_ms: 60_000}])
# Add a single limit
iex> PhoenixGenApi.rl_global(:add, %{key: :ip_address, max_requests: 100, window_ms: 60_000})
# Remove a limit by key
iex> PhoenixGenApi.rl_global(:remove, :ip_address)
[Shell Helper] Quick view of rate limit status for a user.
Usage in IEx
iex> PhoenixGenApi.rl_status("user_123")
@spec set_global_limits([map()]) :: :ok
Sets (replaces) all global rate limits at runtime.
Parameters
limits- A list of global rate limit maps, each with::key- The rate limit key (:user_id,:device_id,:ip_address, or custom string):max_requests- Maximum requests allowed in the window:window_ms- Window duration in milliseconds
Returns
:ok- Limits were updated
Examples
PhoenixGenApi.set_global_limits([
%{key: :user_id, max_requests: 2000, window_ms: 60_000},
%{key: :device_id, max_requests: 10000, window_ms: 60_000}
])
@spec stop_stream(pid()) :: :ok
Stops an active streaming call.
This function gracefully terminates a streaming call process and sends a completion message to the receiver. The stream call process is identified by its PID.
Parameters
stream_pid- The PID of the streaming call process to stop
Returns
:ok- The stop signal was sent successfully
Examples
# Start a stream
{:ok, stream_pid} = StreamCall.start_link(%{
request: request,
fun_config: config,
receiver: self()
})
# Later, stop the stream
PhoenixGenApi.stop_stream(stream_pid)
# Receive the completion message
receive do
{:stream_response, response} ->
assert response.has_more == false
endNotes
- The stream call will send a completion response to its receiver before terminating
- This does not notify the data generator process; it only stops the stream relay
- If you need to stop the data generation itself, handle that in your generator function
@spec update_rate_limit_config(map()) :: :ok
Updates rate limit configuration at runtime.
Parameters
config- A map with:global_limitsand/or:api_limitskeys
Returns
:ok- Configuration was updated
Examples
PhoenixGenApi.update_rate_limit_config(%{
global_limits: [
%{key: :user_id, max_requests: 2000, window_ms: 60_000}
]
})
@spec verify_config(String.t() | atom(), String.t()) :: {:ok, :matched} | {:ok, :mismatch, String.t()} | {:error, :not_found}
Verifies that the server has the given service and config version.
Useful for checking whether a push is necessary before sending the full
configuration. Remote nodes should use ConfigPusher.verify/3 instead.
Parameters
service- The service name (string or atom)config_version- The config version string to verify
Returns
{:ok, :matched}- Version matches what is stored{:ok, :mismatch, stored_version}- Version differs from what is stored{:error, :not_found}- Service is not known
Examples
{:ok, :matched} = PhoenixGenApi.verify_config("my_service", "1.0.0")
{:ok, :mismatch, "0.9.0"} = PhoenixGenApi.verify_config("my_service", "1.0.0")
{:error, :not_found} = PhoenixGenApi.verify_config("unknown_service", "1.0.0")