LLM Gateway API - Elixir HTTP Wrapper for Local LLM Models
View SourceThis guide shows how to build a production-ready HTTP API gateway that wraps your local LLM (running in Docker) and provides an OpenAI-compatible endpoint that works with LangChain.
Table of Contents
- Architecture Overview
- Why Elixir for LLM Gateway
- Complete Gateway Implementation
- Queue Mechanics & Concurrency Control
- API Endpoint Specification
- Configuration & Deployment
- Client Usage with LangChain
- Advanced Features
Architecture Overview
┌─────────────┐ HTTP/JSON ┌──────────────────┐
│ LangChain │ ─────────────────> │ Elixir Gateway │
│ Client │ <───────────────── │ (Phoenix API) │
└─────────────┘ OpenAI-compatible └──────────────────┘
│
┌───────┴────────┐
│ Queue System │
│ (GenStage/ │
│ Broadway) │
└───────┬────────┘
│
HTTP/gRPC/TCP
│
┌───────▼────────┐
│ Docker LLM │
│ (Ollama/vLLM/ │
│ TGI/etc) │
└────────────────┘Key Components:
- Phoenix API Server - HTTP endpoint handling, auth, validation
- Queue System - Request queuing, rate limiting, load balancing
- LLM Client - Communicates with your Docker container
- Response Streamer - SSE (Server-Sent Events) for streaming responses
Why Elixir for LLM Gateway
✅ Concurrency - Handle thousands of simultaneous connections with lightweight processes ✅ Queue Mechanics - Built-in GenStage/Broadway for backpressure and rate limiting ✅ Fault Tolerance - Supervisor trees for automatic recovery ✅ Hot Code Swapping - Update without downtime ✅ Low Latency - Ideal for real-time streaming responses ✅ OTP Platform - Battle-tested distributed systems framework
Complete Gateway Implementation
Project Setup
mix phx.new llm_gateway --no-ecto --no-html --no-assets
cd llm_gateway
Add dependencies to mix.exs:
defp deps do
[
{:phoenix, "~> 1.7.0"},
{:plug_cowboy, "~> 2.5"},
{:jason, "~> 1.4"},
{:finch, "~> 0.16"},
{:gen_stage, "~> 1.2"},
{:broadway, "~> 1.0"},
{:corsica, "~> 2.0"},
{:joken, "~> 2.6"}, # For JWT auth
{:telemetry, "~> 1.2"},
{:telemetry_metrics, "~> 1.0"}
]
end1. API Router
# lib/llm_gateway_web/router.ex
defmodule LLMGatewayWeb.Router do
use Phoenix.Router
import Plug.Conn
pipeline :api do
plug :accepts, ["json"]
plug LLMGatewayWeb.Plugs.APIAuth
plug LLMGatewayWeb.Plugs.RateLimit
plug Corsica, origins: "*", allow_headers: ["authorization", "content-type"]
end
scope "/v1", LLMGatewayWeb do
pipe_through :api
# OpenAI-compatible endpoints
post "/chat/completions", ChatController, :create
get "/models", ModelsController, :index
get "/models/:model_id", ModelsController, :show
# Health check
get "/health", HealthController, :check
end
end2. Authentication Plug
# lib/llm_gateway_web/plugs/api_auth.ex
defmodule LLMGatewayWeb.Plugs.APIAuth do
import Plug.Conn
require Logger
def init(opts), do: opts
def call(conn, _opts) do
# Skip auth for health check
if conn.request_path == "/v1/health" do
conn
else
verify_api_key(conn)
end
end
defp verify_api_key(conn) do
case get_req_header(conn, "authorization") do
["Bearer " <> api_key] ->
if valid_api_key?(api_key) do
assign(conn, :api_key, api_key)
else
unauthorized(conn)
end
_ ->
unauthorized(conn)
end
end
defp valid_api_key?(api_key) do
# Check against your API key store (ETS, Redis, Database)
api_keys = Application.get_env(:llm_gateway, :api_keys, [])
Enum.any?(api_keys, fn {key, _metadata} ->
Plug.Crypto.secure_compare(key, api_key)
end)
end
defp unauthorized(conn) do
conn
|> put_status(401)
|> Phoenix.Controller.json(%{
error: %{
message: "Invalid API key",
type: "invalid_request_error",
code: "invalid_api_key"
}
})
|> halt()
end
end3. Rate Limiting Plug
# lib/llm_gateway_web/plugs/rate_limit.ex
defmodule LLMGatewayWeb.Plugs.RateLimit do
import Plug.Conn
@ets_table :rate_limits
def init(opts), do: opts
def call(conn, _opts) do
api_key = conn.assigns[:api_key]
if api_key && rate_limited?(api_key) do
conn
|> put_status(429)
|> Phoenix.Controller.json(%{
error: %{
message: "Rate limit exceeded",
type: "rate_limit_error"
}
})
|> halt()
else
conn
end
end
defp rate_limited?(api_key) do
now = System.system_time(:second)
window = 60 # 1 minute window
limit = 60 # 60 requests per minute
key = {api_key, div(now, window)}
case :ets.update_counter(@ets_table, key, {2, 1}, {key, 0}) do
count when count > limit -> true
_count -> false
end
end
end4. Chat Controller (OpenAI-Compatible)
# lib/llm_gateway_web/controllers/chat_controller.ex
defmodule LLMGatewayWeb.ChatController do
use Phoenix.Controller
require Logger
alias LLMGateway.LLMQueue
alias LLMGateway.LLMClient
@doc """
POST /v1/chat/completions
OpenAI-compatible chat completions endpoint.
Supports both streaming and non-streaming requests.
"""
def create(conn, params) do
with {:ok, validated} <- validate_params(params),
{:ok, request_id} <- generate_request_id() do
if validated.stream do
stream_response(conn, validated, request_id)
else
sync_response(conn, validated, request_id)
end
else
{:error, reason} ->
conn
|> put_status(400)
|> json(%{
error: %{
message: reason,
type: "invalid_request_error"
}
})
end
end
defp validate_params(params) do
required = ["model", "messages"]
if Enum.all?(required, &Map.has_key?(params, &1)) do
{:ok, %{
model: params["model"],
messages: params["messages"],
temperature: params["temperature"] || 0.7,
max_tokens: params["max_tokens"] || 2048,
stream: params["stream"] || false,
stop: params["stop"],
top_p: params["top_p"] || 1.0
}}
else
{:error, "Missing required parameters: #{Enum.join(required, ", ")}"}
end
end
defp generate_request_id do
{:ok, "chatcmpl-" <> Base.encode16(:crypto.strong_rand_bytes(16), case: :lower)}
end
# Non-streaming response
defp sync_response(conn, params, request_id) do
Logger.info("Sync request #{request_id} for model #{params.model}")
# Enqueue request and wait for response
case LLMQueue.enqueue_and_wait(params, timeout: 120_000) do
{:ok, response_text, usage} ->
conn
|> put_status(200)
|> json(%{
id: request_id,
object: "chat.completion",
created: System.system_time(:second),
model: params.model,
choices: [
%{
index: 0,
message: %{
role: "assistant",
content: response_text
},
finish_reason: "stop"
}
],
usage: usage
})
{:error, :timeout} ->
conn
|> put_status(504)
|> json(%{error: %{message: "Request timeout", type: "timeout_error"}})
{:error, reason} ->
Logger.error("LLM error: #{inspect(reason)}")
conn
|> put_status(500)
|> json(%{error: %{message: "Internal server error", type: "server_error"}})
end
end
# Streaming response (SSE)
defp stream_response(conn, params, request_id) do
Logger.info("Stream request #{request_id} for model #{params.model}")
conn
|> put_resp_content_type("text/event-stream")
|> put_resp_header("cache-control", "no-cache")
|> put_resp_header("connection", "keep-alive")
|> send_chunked(200)
|> stream_chunks(params, request_id)
end
defp stream_chunks(conn, params, request_id) do
# Enqueue streaming request
{:ok, stream} = LLMQueue.enqueue_stream(params)
try do
Enum.reduce_while(stream, conn, fn chunk, conn ->
case chunk do
{:data, content} ->
data = %{
id: request_id,
object: "chat.completion.chunk",
created: System.system_time(:second),
model: params.model,
choices: [
%{
index: 0,
delta: %{content: content},
finish_reason: nil
}
]
}
case send_sse_chunk(conn, data) do
{:ok, conn} -> {:cont, conn}
{:error, _} -> {:halt, conn}
end
{:done, usage} ->
# Send final chunk
final = %{
id: request_id,
object: "chat.completion.chunk",
created: System.system_time(:second),
model: params.model,
choices: [
%{
index: 0,
delta: %{},
finish_reason: "stop"
}
],
usage: usage
}
send_sse_chunk(conn, final)
send_sse_done(conn)
{:halt, conn}
{:error, reason} ->
Logger.error("Stream error: #{inspect(reason)}")
{:halt, conn}
end
end)
rescue
e ->
Logger.error("Stream exception: #{inspect(e)}")
conn
end
end
defp send_sse_chunk(conn, data) do
json_data = Jason.encode!(data)
chunk(conn, "data: #{json_data}\n\n")
end
defp send_sse_done(conn) do
chunk(conn, "data: [DONE]\n\n")
end
end5. Queue System with GenStage
# lib/llm_gateway/llm_queue.ex
defmodule LLMGateway.LLMQueue do
@moduledoc """
Queue system for LLM requests with backpressure control.
Uses GenStage to handle concurrent request processing.
"""
use GenStage
require Logger
@max_concurrent_requests 4 # Adjust based on your GPU/model capacity
def start_link(opts) do
GenStage.start_link(__MODULE__, opts, name: __MODULE__)
end
@doc """
Enqueue a request and wait for response (sync).
"""
def enqueue_and_wait(params, opts \\ []) do
timeout = Keyword.get(opts, :timeout, 120_000)
task = Task.async(fn ->
request_ref = make_ref()
GenStage.call(__MODULE__, {:enqueue, params, self(), request_ref}, timeout)
receive do
{:response, ^request_ref, result} -> result
after
timeout -> {:error, :timeout}
end
end)
Task.await(task, timeout)
end
@doc """
Enqueue a streaming request.
Returns a stream that emits chunks.
"""
def enqueue_stream(params) do
{:ok, pid} = LLMGateway.StreamProducer.start_link(params)
{:ok, GenStage.stream([{pid, []}])}
end
## GenStage Callbacks
@impl true
def init(_opts) do
{:producer, {:queue.new(), 0}, dispatcher: GenStage.DemandDispatcher}
end
@impl true
def handle_call({:enqueue, params, caller, ref}, _from, {queue, pending_demand}) do
queue = :queue.in({params, caller, ref}, queue)
{:reply, :ok, [], {queue, pending_demand}}
end
@impl true
def handle_demand(incoming_demand, {queue, pending_demand}) do
dispatch_events(queue, incoming_demand + pending_demand, [])
end
defp dispatch_events(queue, 0, events) do
{:noreply, Enum.reverse(events), {queue, 0}}
end
defp dispatch_events(queue, demand, events) do
case :queue.out(queue) do
{{:value, item}, queue} ->
dispatch_events(queue, demand - 1, [item | events])
{:empty, queue} ->
{:noreply, Enum.reverse(events), {queue, demand}}
end
end
end6. Queue Consumer (Worker)
# lib/llm_gateway/llm_consumer.ex
defmodule LLMGateway.LLMConsumer do
@moduledoc """
Consumer that processes LLM requests from the queue.
"""
use GenStage
require Logger
alias LLMGateway.LLMClient
def start_link(opts) do
GenStage.start_link(__MODULE__, opts)
end
@impl true
def init(opts) do
{:consumer, %{}, subscribe_to: [{LLMGateway.LLMQueue, []}]}
end
@impl true
def handle_events(events, _from, state) do
for {params, caller, ref} <- events do
Task.start(fn ->
result = process_request(params)
send(caller, {:response, ref, result})
end)
end
{:noreply, [], state}
end
defp process_request(params) do
start_time = System.monotonic_time(:millisecond)
result = LLMClient.generate(params)
duration = System.monotonic_time(:millisecond) - start_time
Logger.info("Request completed in #{duration}ms")
result
end
end7. LLM Client (Connects to Docker Container)
# lib/llm_gateway/llm_client.ex
defmodule LLMGateway.LLMClient do
@moduledoc """
Client for communicating with LLM running in Docker.
Supports multiple backend types: Ollama, vLLM, TGI, etc.
"""
require Logger
@doc """
Generate completion from LLM.
"""
def generate(params) do
backend = Application.get_env(:llm_gateway, :llm_backend, :ollama)
case backend do
:ollama -> generate_ollama(params)
:vllm -> generate_vllm(params)
:tgi -> generate_tgi(params)
:custom -> generate_custom(params)
end
end
## Ollama Backend
defp generate_ollama(params) do
url = get_backend_url() <> "/api/chat"
payload = %{
model: params.model,
messages: params.messages,
stream: false,
options: %{
temperature: params.temperature,
num_predict: params.max_tokens,
top_p: params.top_p,
stop: params.stop
}
}
headers = [{"Content-Type", "application/json"}]
body = Jason.encode!(payload)
case Finch.build(:post, url, headers, body)
|> Finch.request(LLMGateway.Finch, receive_timeout: 120_000) do
{:ok, %{status: 200, body: response_body}} ->
case Jason.decode(response_body) do
{:ok, %{"message" => %{"content" => content}}} ->
usage = extract_usage_ollama(response_body)
{:ok, content, usage}
{:error, _} = error ->
Logger.error("Failed to decode Ollama response: #{inspect(error)}")
{:error, :decode_error}
end
{:ok, %{status: status, body: body}} ->
Logger.error("Ollama returned #{status}: #{body}")
{:error, {:http_error, status}}
{:error, reason} ->
Logger.error("Ollama request failed: #{inspect(reason)}")
{:error, reason}
end
end
## vLLM Backend
defp generate_vllm(params) do
url = get_backend_url() <> "/v1/chat/completions"
payload = %{
model: params.model,
messages: params.messages,
temperature: params.temperature,
max_tokens: params.max_tokens,
top_p: params.top_p,
stop: params.stop
}
headers = [{"Content-Type", "application/json"}]
body = Jason.encode!(payload)
case Finch.build(:post, url, headers, body)
|> Finch.request(LLMGateway.Finch, receive_timeout: 120_000) do
{:ok, %{status: 200, body: response_body}} ->
case Jason.decode(response_body) do
{:ok, %{"choices" => [%{"message" => %{"content" => content}} | _], "usage" => usage}} ->
{:ok, content, format_usage(usage)}
{:error, _} = error ->
{:error, :decode_error}
end
{:ok, %{status: status}} ->
{:error, {:http_error, status}}
{:error, reason} ->
{:error, reason}
end
end
## Text Generation Inference (TGI) Backend
defp generate_tgi(params) do
url = get_backend_url() <> "/generate"
# Convert chat messages to prompt
prompt = messages_to_prompt(params.messages)
payload = %{
inputs: prompt,
parameters: %{
temperature: params.temperature,
max_new_tokens: params.max_tokens,
top_p: params.top_p,
stop: params.stop || []
}
}
headers = [{"Content-Type", "application/json"}]
body = Jason.encode!(payload)
case Finch.build(:post, url, headers, body)
|> Finch.request(LLMGateway.Finch, receive_timeout: 120_000) do
{:ok, %{status: 200, body: response_body}} ->
case Jason.decode(response_body) do
{:ok, %{"generated_text" => content, "details" => details}} ->
usage = extract_usage_tgi(details)
{:ok, String.trim(content), usage}
{:error, _} ->
{:error, :decode_error}
end
{:ok, %{status: status}} ->
{:error, {:http_error, status}}
{:error, reason} ->
{:error, reason}
end
end
## Custom Backend (Your implementation)
defp generate_custom(params) do
# Implement your custom protocol here
# This could be gRPC, raw TCP, or any other protocol
url = get_backend_url()
# ... your custom implementation
end
## Helpers
defp get_backend_url do
Application.get_env(:llm_gateway, :llm_backend_url, "http://localhost:11434")
end
defp messages_to_prompt(messages) do
Enum.map_join(messages, "\n", fn msg ->
role = String.capitalize(msg["role"])
"#{role}: #{msg["content"]}"
end)
end
defp extract_usage_ollama(_response) do
# Ollama doesn't return usage in the same format
# You may need to parse eval_count, prompt_eval_count from response
%{
prompt_tokens: 0,
completion_tokens: 0,
total_tokens: 0
}
end
defp extract_usage_tgi(details) do
%{
prompt_tokens: details["prefill"] || 0,
completion_tokens: details["generated_tokens"] || 0,
total_tokens: (details["prefill"] || 0) + (details["generated_tokens"] || 0)
}
end
defp format_usage(usage) do
%{
prompt_tokens: usage["prompt_tokens"] || 0,
completion_tokens: usage["completion_tokens"] || 0,
total_tokens: usage["total_tokens"] || 0
}
end
end8. Stream Producer (for SSE)
# lib/llm_gateway/stream_producer.ex
defmodule LLMGateway.StreamProducer do
use GenStage
require Logger
alias LLMGateway.LLMClient
def start_link(params) do
GenStage.start_link(__MODULE__, params)
end
@impl true
def init(params) do
{:producer, %{params: params, buffer: []}}
end
@impl true
def handle_demand(demand, state) when demand > 0 do
# Start streaming from LLM
if state.buffer == [] do
spawn_link(fn -> stream_from_llm(self(), state.params) end)
end
{:noreply, [], state}
end
@impl true
def handle_info({:chunk, content}, state) do
{:noreply, [{:data, content}], state}
end
def handle_info({:done, usage}, state) do
{:noreply, [{:done, usage}], state}
end
def handle_info({:error, reason}, state) do
{:noreply, [{:error, reason}], state}
end
defp stream_from_llm(producer_pid, params) do
# Stream from LLM backend
backend = Application.get_env(:llm_gateway, :llm_backend, :ollama)
url = Application.get_env(:llm_gateway, :llm_backend_url, "http://localhost:11434")
case backend do
:ollama ->
stream_ollama(producer_pid, url <> "/api/chat", params)
:vllm ->
stream_vllm(producer_pid, url <> "/v1/chat/completions", params)
_ ->
send(producer_pid, {:error, :streaming_not_supported})
end
end
defp stream_ollama(producer_pid, url, params) do
payload = %{
model: params.model,
messages: params.messages,
stream: true,
options: %{
temperature: params.temperature,
num_predict: params.max_tokens
}
}
headers = [{"Content-Type", "application/json"}]
body = Jason.encode!(payload)
request = Finch.build(:post, url, headers, body)
Finch.stream(request, LLMGateway.Finch, nil, fn
{:status, _status}, acc -> acc
{:headers, _headers}, acc -> acc
{:data, data}, acc ->
# Parse SSE chunks
String.split(data, "\n", trim: true)
|> Enum.each(fn line ->
case Jason.decode(line) do
{:ok, %{"message" => %{"content" => content}}} when content != "" ->
send(producer_pid, {:chunk, content})
{:ok, %{"done" => true}} ->
send(producer_pid, {:done, %{prompt_tokens: 0, completion_tokens: 0, total_tokens: 0}})
_ ->
:ok
end
end)
acc
end)
end
defp stream_vllm(producer_pid, url, params) do
payload = %{
model: params.model,
messages: params.messages,
temperature: params.temperature,
max_tokens: params.max_tokens,
stream: true
}
headers = [{"Content-Type", "application/json"}]
body = Jason.encode!(payload)
request = Finch.build(:post, url, headers, body)
Finch.stream(request, LLMGateway.Finch, nil, fn
{:status, _}, acc -> acc
{:headers, _}, acc -> acc
{:data, data}, acc ->
String.split(data, "\n", trim: true)
|> Enum.each(fn line ->
case String.trim_leading(line, "data: ") do
"[DONE]" ->
send(producer_pid, {:done, %{prompt_tokens: 0, completion_tokens: 0, total_tokens: 0}})
json_str ->
case Jason.decode(json_str) do
{:ok, %{"choices" => [%{"delta" => %{"content" => content}} | _]}} when content != "" ->
send(producer_pid, {:chunk, content})
_ ->
:ok
end
end
end)
acc
end)
end
end9. Application Supervisor
# lib/llm_gateway/application.ex
defmodule LLMGateway.Application do
use Application
@impl true
def start(_type, _args) do
# Create ETS table for rate limiting
:ets.new(:rate_limits, [:set, :public, :named_table])
children = [
# HTTP client pool
{Finch, name: LLMGateway.Finch, pools: %{
default: [size: 32, count: 4]
}},
# Queue system
LLMGateway.LLMQueue,
# Start multiple consumers (workers)
Enum.map(1..4, fn i ->
Supervisor.child_spec(
{LLMGateway.LLMConsumer, []},
id: {LLMGateway.LLMConsumer, i}
)
end),
# Phoenix endpoint
LLMGatewayWeb.Endpoint
]
|> List.flatten()
opts = [strategy: :one_for_one, name: LLMGateway.Supervisor]
Supervisor.start_link(children, opts)
end
end10. Configuration
# config/config.exs
import Config
config :llm_gateway,
# LLM Backend configuration
llm_backend: :ollama, # :ollama, :vllm, :tgi, :custom
llm_backend_url: System.get_env("LLM_BACKEND_URL", "http://localhost:11434"),
# API Keys (load from env or secure store)
api_keys: [
{"sk-test-key-1", %{user: "user1", rate_limit: 60}},
{"sk-test-key-2", %{user: "user2", rate_limit: 120}}
],
# Queue settings
max_concurrent_requests: String.to_integer(System.get_env("MAX_CONCURRENT", "4")),
request_timeout: 120_000
config :llm_gateway, LLMGatewayWeb.Endpoint,
url: [host: "localhost"],
http: [port: 4000],
server: true,
secret_key_base: "your-secret-key-base"
# Production config
import_config "#{config_env()}.exs"# config/prod.exs
import Config
config :llm_gateway,
llm_backend: String.to_atom(System.get_env("LLM_BACKEND", "ollama")),
llm_backend_url: System.get_env("LLM_BACKEND_URL")!,
api_keys: load_api_keys_from_env()
defp load_api_keys_from_env do
# Load from environment or secret management system
case System.get_env("API_KEYS_JSON") do
nil -> []
json ->
Jason.decode!(json)
|> Enum.map(fn {key, meta} -> {key, Map.new(meta)} end)
end
endAPI Endpoint Specification
Authentication
All requests must include an API key in the Authorization header:
Authorization: Bearer sk-your-api-key-hereChat Completions (Non-Streaming)
Request:
POST /v1/chat/completions
Content-Type: application/json
{
"model": "llama3.2:latest",
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello!"}
],
"temperature": 0.7,
"max_tokens": 2048
}Response:
{
"id": "chatcmpl-abc123",
"object": "chat.completion",
"created": 1699999999,
"model": "llama3.2:latest",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "Hello! How can I help you today?"
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 20,
"completion_tokens": 10,
"total_tokens": 30
}
}Chat Completions (Streaming)
Request:
POST /v1/chat/completions
Content-Type: application/json
{
"model": "llama3.2:latest",
"messages": [...],
"stream": true
}Response (SSE):
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","created":1699999999,"model":"llama3.2:latest","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","created":1699999999,"model":"llama3.2:latest","choices":[{"index":0,"delta":{"content":"!"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","created":1699999999,"model":"llama3.2:latest","choices":[{"index":0,"delta":{},"finish_reason":"stop"}],"usage":{"prompt_tokens":20,"completion_tokens":10,"total_tokens":30}}
data: [DONE]Configuration & Deployment
Docker Compose Setup
# docker-compose.yml
version: '3.8'
services:
# Ollama LLM Backend
ollama:
image: ollama/ollama:latest
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
environment:
- OLLAMA_NUM_PARALLEL=4
- OLLAMA_MAX_LOADED_MODELS=2
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
# Elixir Gateway
llm_gateway:
build: .
ports:
- "4000:4000"
environment:
- LLM_BACKEND=ollama
- LLM_BACKEND_URL=http://ollama:11434
- MAX_CONCURRENT=4
- API_KEYS_JSON={"sk-test":"{"user":"test","rate_limit":60}}"}
depends_on:
- ollama
restart: unless-stopped
volumes:
ollama_data:Dockerfile
# Dockerfile
FROM elixir:1.15-alpine AS builder
WORKDIR /app
# Install build dependencies
RUN apk add --no-cache build-base git
# Install hex and rebar
RUN mix local.hex --force && mix local.rebar --force
# Copy mix files
COPY mix.exs mix.lock ./
RUN mix deps.get --only prod
RUN mix deps.compile
# Copy application code
COPY . .
# Compile and build release
RUN mix compile
RUN mix release
# Production image
FROM alpine:3.18
RUN apk add --no-cache openssl ncurses-libs libstdc++
WORKDIR /app
COPY --from=builder /app/_build/prod/rel/llm_gateway ./
ENV HOME=/app
ENV MIX_ENV=prod
CMD ["bin/llm_gateway", "start"]Client Usage with LangChain
Using with GettextTranslator
# config/config.exs
config :gettext_translator, GettextTranslator,
endpoint: LangChain.ChatModels.ChatOpenAI,
endpoint_model: "llama3.2:latest",
endpoint_temperature: 0,
endpoint_config: %{
"openai_key" => "sk-your-api-key",
"openai_endpoint" => "http://your-gateway.com:4000/v1/chat/completions"
},
persona: "Professional translator",
style: "Casual",
ignored_languages: ["en"]Using with LangChain Directly
alias LangChain.ChatModels.ChatOpenAI
alias LangChain.Chains.LLMChain
alias LangChain.Message
# Configure to use your gateway
Application.put_env(:langchain, :openai_key, "sk-your-api-key")
Application.put_env(:langchain, :openai_endpoint, "http://your-gateway.com:4000/v1/chat/completions")
# Create LLM
{:ok, llm} = ChatOpenAI.new(%{
model: "llama3.2:latest",
temperature: 0.7
})
# Create chain
{:ok, chain} = LLMChain.new(%{llm: llm})
|> LLMChain.add_message(Message.new_user!("Translate 'hello' to Spanish"))
|> LLMChain.run()
# Get response
response = LangChain.Message.ContentPart.parts_to_string(chain.last_message.content)Advanced Features
1. Model Management
# lib/llm_gateway_web/controllers/models_controller.ex
defmodule LLMGatewayWeb.ModelsController do
use Phoenix.Controller
def index(conn, _params) do
models = LLMGateway.ModelRegistry.list_models()
json(conn, %{
object: "list",
data: Enum.map(models, &format_model/1)
})
end
defp format_model(model) do
%{
id: model.name,
object: "model",
created: model.created_at,
owned_by: "local",
capabilities: model.capabilities
}
end
end2. Request Prioritization
# Add priority to queue
def enqueue_with_priority(params, priority \\ :normal) do
GenStage.call(__MODULE__, {:enqueue, params, priority, self(), make_ref()})
end
# In init, use priority queue
{:producer, PriorityQueue.new(), dispatcher: GenStage.DemandDispatcher}3. Metrics & Monitoring
# lib/llm_gateway/telemetry.ex
defmodule LLMGateway.Telemetry do
use Supervisor
import Telemetry.Metrics
def start_link(arg) do
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
end
def init(_arg) do
children = [
{:telemetry_poller, measurements: periodic_measurements(), period: 10_000}
]
Supervisor.init(children, strategy: :one_for_one)
end
def metrics do
[
# Queue metrics
last_value("llm_gateway.queue.length"),
last_value("llm_gateway.queue.pending_demand"),
# Request metrics
counter("llm_gateway.request.count"),
distribution("llm_gateway.request.duration", unit: {:native, :millisecond}),
# Error metrics
counter("llm_gateway.error.count")
]
end
defp periodic_measurements do
[]
end
end4. Caching Layer
# lib/llm_gateway/cache.ex
defmodule LLMGateway.Cache do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def get(key) do
GenServer.call(__MODULE__, {:get, key})
end
def put(key, value, ttl \\ 3600) do
GenServer.cast(__MODULE__, {:put, key, value, ttl})
end
def init(_opts) do
:ets.new(:llm_cache, [:set, :public, :named_table])
{:ok, %{}}
end
# ... implementation
endSummary
This Elixir-based LLM gateway provides:
✅ OpenAI-compatible API - Works seamlessly with LangChain ✅ Queue mechanics - GenStage-based backpressure control ✅ Concurrency control - Prevents overloading your GPU ✅ Rate limiting - Per-API-key rate limits ✅ Streaming support - SSE for real-time responses ✅ Multiple backends - Ollama, vLLM, TGI support ✅ Authentication - API key-based auth ✅ Fault tolerance - Supervisor trees for reliability ✅ Metrics ready - Telemetry integration
Deploy it alongside your Docker LLM container and expose it to your users with API keys!