dream_http_client/client
Type-safe HTTP client with recording + streaming support
Gleam doesn’t ship with an HTTPS client, so this module wraps Erlang’s
battle‑hardened httpc and adds a friendly builder API, streaming helpers,
and optional record/playback via dream_http_client/recorder.
Quick Example — blocking request
import dream_http_client/client.{add_header, host, path, send}
pub fn call_api(token: String) -> Result(String, String) {
client.new()
|> host("api.example.com")
|> path("/users/123")
|> add_header("Authorization", "Bearer " <> token)
|> send()
}
Execution modes
You can execute the same ClientRequest in three ways:
- Blocking:
send()returns the full response body. - Pull streaming:
stream_yielder()returns ayielder.Yielderof chunks. - Callback streaming:
start_stream()spawns a stream process and calls your callbacks (on_stream_*) as events arrive.
The “right” choice is mostly about concurrency:
- Use
send()for normal JSON APIs. - Use
stream_yielder()for scripts/one‑offs where blocking is fine. - Use
start_stream()when you need non‑blocking streaming in OTP code.
Recording and playback
Attach a recorder with recorder() to record real HTTP traffic to disk, or
to play back recordings without network calls.
import dream_http_client/client.{host, path, recorder, send}
import dream_http_client/recorder.{directory, mode, start}
let assert Ok(rec) =
recorder.new()
|> directory("mocks/api")
|> mode("record")
|> start()
let assert Ok(body) =
client.new()
|> host("api.example.com")
|> path("/users/123")
|> recorder(rec)
|> send()
Inspecting requests
ClientRequest is opaque to keep the public API stable; use the get_*
functions for logging/testing.
import dream_http_client/client.{get_host, get_path, host, path}
import gleam/io
let req = client.new() |> host("api.example.com") |> path("/users/123")
io.println("Calling: " <> get_host(req) <> get_path(req))
Types
HTTP client request configuration
Represents a complete HTTP request with all its components. Use the builder
pattern with functions like host(), path(), method(), etc. to configure
the request, then execute it with send(), stream_yielder(), or
start_stream() depending on whether you want a
blocking, pull-streaming, or callback-streaming API.
Fields
method: The HTTP method (GET, POST, etc.)scheme: The protocol (HTTP or HTTPS)host: The server hostnameport: Optional port number (defaults to 80 for HTTP, 443 for HTTPS)path: The request path (e.g., “/api/users”)query: Optional query string (e.g., “?page=1&limit=10”)headers: List of header name-value pairsbody: The request body as a stringtimeout: Optional timeout in milliseconds (defaults to 30000ms)recorder: Optional recorder for request/response recording and playback
The type is opaque to ensure API stability. Use new with builder functions
to construct requests, and the getter functions to inspect request properties.
pub opaque type ClientRequest
HTTP header
Represents a single HTTP header with a name and value. Used throughout the module for type-safe header handling.
Fields
name: Header name (e.g., “Content-Type”, “Authorization”)value: Header value (e.g., “application/json”, “Bearer token”)
Usage
Headers are constructed automatically by builder functions like add_header(),
but you’ll work with this type when inspecting headers:
let headers = client.get_headers(request)
case headers {
[Header(name, value), ..] -> {
io.println(name <> ": " <> value)
}
[] -> io.println("No headers")
}
Notes
- Header names are case-sensitive as stored, but HTTP treats them case-insensitively
- Duplicate header names are allowed (e.g., multiple Set-Cookie headers)
- Headers are stored in the order they were added
pub type Header {
Header(name: String, value: String)
}
Constructors
-
Header(name: String, value: String)
Opaque request identifier for internal streaming
A unique identifier for an active HTTP stream. You will usually only see
a RequestId inside StreamMessage values processed by the stream process
created by start_stream(), or when using the low-level cancel_stream().
Usage
Most users never need to construct or store RequestId. Prefer controlling
streams via StreamHandle (cancel_stream_handle, is_stream_active,
await_stream).
Examples
// RequestIds are returned by internal streaming machinery.
// Prefer StreamHandle for user code.
Notes
- RequestId values are opaque - do not rely on their internal structure
- RequestIds are unique per VM instance but not stable across restarts
- Use pattern matching or equality comparison to identify streams
pub opaque type RequestId
Handle to a running HTTP stream
Opaque handle returned from start_stream() representing a stream running in
a dedicated BEAM process. Use this handle to control the stream lifecycle.
Lifecycle Management
await_stream(handle)- Wait for stream to completecancel_stream_handle(handle)- Stop the stream earlyis_stream_active(handle)- Check if still running
Process Isolation
Each stream runs in its own BEAM process, which means:
- Multiple streams run concurrently without blocking
- Stream crashes don’t affect your application
- Your process mailbox stays clean (HTTP messages go to stream process)
- Callbacks execute in the stream process, not your process
Example
// Start stream
let assert Ok(stream) = client.start_stream(request)
// Check status
case client.is_stream_active(stream) {
True -> io.println("Still streaming...")
False -> io.println("Completed")
}
// Wait for completion
client.await_stream(stream)
// Or cancel early
client.cancel_stream_handle(stream)
pub opaque type StreamHandle
Stream message types emitted by internal streaming machinery
start_stream() runs a stream loop in a dedicated process; that process
receives and decodes httpc messages into these variants.
Message Flow
StreamStart- Headers received, body chunks comingChunk- Zero or more data chunksStreamEndorStreamError- Stream completed normallyDecodeError- FFI layer corruption (rare, should be reported as a bug)
DecodeError
DecodeError indicates the Erlang→Gleam FFI boundary received a malformed
message from httpc. This is not a normal HTTP error - it means either:
- Erlang/OTP version incompatibility with this library
- Memory corruption or other serious runtime issue
- A bug in this library’s FFI code
What to do: If you see a DecodeError, please report it as a bug at
https://github.com/TrustBound/dream/issues with the full error message.
The error message includes debug information to help diagnose the issue.
Unlike StreamError which has a RequestId, DecodeError does not because
the request ID itself could not be decoded from the corrupted message.
pub type StreamMessage {
StreamStart(request_id: RequestId, headers: List(Header))
Chunk(request_id: RequestId, data: BitArray)
StreamEnd(request_id: RequestId, headers: List(Header))
StreamError(request_id: RequestId, reason: String)
DecodeError(reason: String)
}
Constructors
-
Stream started, headers received
-
Chunk(request_id: RequestId, data: BitArray)Data chunk received
-
Stream completed successfully
-
StreamError(request_id: RequestId, reason: String)Stream failed with error (connection drop, timeout, HTTP error, etc.)
-
DecodeError(reason: String)Failed to decode stream message from Erlang FFI (indicates library bug)
Values
pub fn add_header(
client_request: ClientRequest,
name: String,
value: String,
) -> ClientRequest
Add a header to the request
Adds a single header to the existing headers list without replacing them. The new header is prepended to the list, so it will take precedence if there’s a duplicate header name.
Parameters
client_request: The request to modifyname: The header name (e.g., “Authorization”, “Content-Type”)value: The header value
Returns
A new ClientRequest with the header added.
Example
import dream_http_client/client
client.new()
|> client.add_header("Authorization", "Bearer " <> token)
|> client.add_header("Content-Type", "application/json")
pub fn await_stream(handle: StreamHandle) -> Nil
Wait for a stream to complete
Blocks until the stream process exits. Use this when you need to wait for the stream to finish before continuing.
Returns Ok(Nil) when stream completes.
For timeout behavior, use cancel_stream_handle() with a timer, or implement your own timeout logic.
Example
let assert Ok(stream) = client.start_stream(request)
client.await_stream(stream)
io.println("Stream finished")
pub fn body(
client_request: ClientRequest,
body_value: String,
) -> ClientRequest
Set the body for the request
Sets the request body as a string. Typically used for POST, PUT, and PATCH requests. For JSON, serialize your data first.
Parameters
client_request: The request to modifybody_value: The request body as a string
Returns
A new ClientRequest with the body updated.
Example
import dream_http_client/client
import gleam/json
let json_body = json.object([
#("name", json.string("Alice")),
#("email", json.string("alice@example.com")),
])
client.new()
|> client.method(http.Post)
|> client.body(json.to_string(json_body))
pub fn cancel_stream(request_id: RequestId) -> Nil
Cancel an active streaming request (low-level API)
Cancels an HTTP stream given its RequestId.
Note: Most users should use start_stream() and cancel_stream_handle()
instead. cancel_stream() exists primarily to support internal stream
machinery and advanced integrations.
Parameters
request_id: The request ID for an active internal stream
Example
This is typically not called directly unless you already have a RequestId.
pub fn cancel_stream_handle(handle: StreamHandle) -> Nil
Cancel a stream started with start_stream()
Stops the stream process and cancels the underlying HTTP request. Safe to call multiple times on the same handle.
Example
let assert Ok(stream) = client.start_stream(request)
// Later:
client.cancel_stream_handle(stream)
pub fn get_body(client_request: ClientRequest) -> String
Get the body from a request
Returns the request body as a string.
Example
import dream_http_client/client
let req = client.new() |> client.body("{\"name\": \"Alice\"}")
let body = client.get_body(req)
// body == "{\"name\": \"Alice\"}"
pub fn get_headers(client_request: ClientRequest) -> List(Header)
Get the headers from a request
Returns the list of headers configured for the request.
Example
import dream_http_client/client
let req = client.new()
|> client.add_header("Authorization", "Bearer token")
|> client.add_header("Content-Type", "application/json")
let headers = client.get_headers(req)
// headers == [Header("Content-Type", "application/json"), Header("Authorization", "Bearer token")]
pub fn get_host(client_request: ClientRequest) -> String
Get the host from a request
Returns the hostname configured for the request.
Example
import dream_http_client/client
let req = client.new() |> client.host("api.example.com")
let host = client.get_host(req)
// host == "api.example.com"
pub fn get_method(client_request: ClientRequest) -> http.Method
Get the HTTP method from a request
Returns the HTTP method (GET, POST, etc.) configured for the request.
Example
import dream_http_client/client
import gleam/http.{Post}
let req = client.new() |> client.method(Post)
let method = client.get_method(req)
// method == Post
pub fn get_path(client_request: ClientRequest) -> String
Get the path from a request
Returns the request path configured for the request.
Example
import dream_http_client/client
let req = client.new() |> client.path("/api/users")
let path = client.get_path(req)
// path == "/api/users"
pub fn get_port(
client_request: ClientRequest,
) -> option.Option(Int)
Get the port from a request
Returns the optional port number configured for the request. If None, the default port for the scheme will be used (80 for HTTP, 443 for HTTPS).
Example
import dream_http_client/client
let req = client.new() |> client.port(8080)
let port = client.get_port(req)
// port == Some(8080)
pub fn get_query(
client_request: ClientRequest,
) -> option.Option(String)
Get the query string from a request
Returns the optional query string configured for the request.
Example
import dream_http_client/client
let req = client.new() |> client.query("page=1&limit=10")
let query = client.get_query(req)
// query == Some("page=1&limit=10")
pub fn get_recorder(
client_request: ClientRequest,
) -> option.Option(recorder.Recorder)
Get the recorder from a request
Returns the optional recorder attached to the request for recording or playback.
Example
import dream_http_client/client
import dream_http_client/recorder.{directory, mode, start}
let assert Ok(rec) =
recorder.new()
|> directory("mocks")
|> mode("record")
|> start()
let req = client.new() |> client.recorder(rec)
let recorder_opt = client.get_recorder(req)
// recorder_opt == Some(rec)
pub fn get_scheme(client_request: ClientRequest) -> http.Scheme
Get the URI scheme from a request
Returns the scheme (HTTP or HTTPS) configured for the request.
Example
import dream_http_client/client
import gleam/http.{Http}
let req = client.new() |> client.scheme(Http)
let scheme = client.get_scheme(req)
// scheme == Http
pub fn get_timeout(
client_request: ClientRequest,
) -> option.Option(Int)
Get the timeout from a request
Returns the optional timeout in milliseconds configured for the request. If None, the default timeout (30000ms) will be used.
Example
import dream_http_client/client
let req = client.new() |> client.timeout(5000)
let timeout = client.get_timeout(req)
// timeout == Some(5000)
pub fn headers(
client_request: ClientRequest,
headers_value: List(Header),
) -> ClientRequest
Set the headers for the request
Replaces all existing headers with the provided list. Use add_header()
to add a single header without replacing existing ones.
Parameters
client_request: The request to modifyheaders_value: List of header tuples#(name, value)
Returns
A new ClientRequest with headers replaced.
Example
import dream_http_client/client
client.new()
|> client.headers([
#("Authorization", "Bearer " <> token),
#("Content-Type", "application/json"),
])
pub fn host(
client_request: ClientRequest,
host_value: String,
) -> ClientRequest
Set the host for the request
Sets the server hostname or IP address. This is required for all requests.
Parameters
client_request: The request to modifyhost_value: The hostname (e.g., “api.example.com” or “192.168.1.1”)
Returns
A new ClientRequest with the host updated.
Example
import dream_http_client/client
client.new()
|> client.host("api.example.com")
pub fn is_stream_active(handle: StreamHandle) -> Bool
Check if a stream is still active
Returns True if the stream process is still running, False otherwise.
Example
let assert Ok(stream) = client.start_stream(request)
case client.is_stream_active(stream) {
True -> io.println("Stream still running")
False -> io.println("Stream completed")
}
pub fn method(
client_request: ClientRequest,
method_value: http.Method,
) -> ClientRequest
Set the HTTP method for the request
Configures the HTTP method (GET, POST, PUT, DELETE, etc.) for the request.
Parameters
client_request: The request to modifymethod_value: The HTTP method to use
Returns
A new ClientRequest with the method updated.
Example
import dream_http_client/client
import gleam/http
client.new()
|> client.method(http.Post)
pub fn new() -> ClientRequest
Default client request configuration
Creates a new ClientRequest with sensible defaults:
- Method: GET
- Scheme: HTTPS
- Host: “localhost”
- Port: None (uses default for scheme)
- Path: “” (empty)
- Query: None
- Headers: [] (empty)
- Body: “” (empty)
- Timeout: None (uses default 30000ms)
Use this as the starting point for building requests with the builder pattern.
Example
import dream_http_client/client.{host, method, new, path}
import gleam/http.{Get}
new()
|> host("api.example.com")
|> path("/users/123")
|> method(Get)
pub fn on_stream_chunk(
client_request: ClientRequest,
callback: fn(BitArray) -> Nil,
) -> ClientRequest
Set callback for stream chunk event
Sets a function to be called for each data chunk received from the stream. This is where you process the actual response data.
Parameters
client_request: The request to modifycallback: Function called with each chunk of data
Example
client.new()
|> client.host("api.openai.com")
|> client.on_stream_chunk(fn(data) {
let text = bytes_tree.from_bit_array(data) |> bytes_tree.to_string
io.print(text)
})
|> client.start_stream()
pub fn on_stream_end(
client_request: ClientRequest,
callback: fn(List(Header)) -> Nil,
) -> ClientRequest
Set callback for stream end event
Sets a function to be called when a stream completes successfully. Optional - if not set, stream completion is ignored.
Parameters
client_request: The request to modifycallback: Function called with trailing headers when stream completes
Example
client.new()
|> client.host("api.example.com")
|> client.on_stream_end(fn(_headers) {
io.println("Stream completed")
})
|> client.start_stream()
pub fn on_stream_error(
client_request: ClientRequest,
callback: fn(String) -> Nil,
) -> ClientRequest
Set callback for stream error event
Sets a function to be called if the stream fails with an error. Handles both HTTP errors and network errors.
Parameters
client_request: The request to modifycallback: Function called with error reason if stream fails
Example
client.new()
|> client.host("api.example.com")
|> client.on_stream_error(fn(reason) {
io.println_error("Stream failed: " <> reason)
})
|> client.start_stream()
pub fn on_stream_start(
client_request: ClientRequest,
callback: fn(List(Header)) -> Nil,
) -> ClientRequest
Set callback for stream start event
Sets a function to be called when a stream starts and headers are received. Optional - if not set, stream start is ignored.
Parameters
client_request: The request to modifycallback: Function called with response headers when stream starts
Example
client.new()
|> client.host("api.example.com")
|> client.on_stream_start(fn(headers) {
io.println("Stream started with " <> int.to_string(list.length(headers)) <> " headers")
})
|> client.start_stream()
pub fn path(
client_request: ClientRequest,
path_value: String,
) -> ClientRequest
Set the path for the request
Sets the request path. Should start with “/” for absolute paths.
Parameters
client_request: The request to modifypath_value: The path (e.g., “/api/users” or “/api/users/123”)
Returns
A new ClientRequest with the path updated.
Example
import dream_http_client/client
client.new()
|> client.path("/api/users/123")
pub fn port(
client_request: ClientRequest,
port_value: Int,
) -> ClientRequest
Set the port for the request
Sets a custom port number. If not set, defaults to 80 for HTTP and 443 for HTTPS. Only set this if you’re using a non-standard port.
Parameters
client_request: The request to modifyport_value: The port number (e.g., 8080, 3000)
Returns
A new ClientRequest with the port updated.
Example
import dream_http_client/client
client.new()
|> client.host("localhost")
|> client.port(3000) // Use port 3000 instead of default
pub fn query(
client_request: ClientRequest,
query_value: String,
) -> ClientRequest
Set the query string for the request
Sets the query string portion of the URL. Do not include the leading “?”.
Parameters
client_request: The request to modifyquery_value: The query string (e.g., “page=1&limit=10”)
Returns
A new ClientRequest with the query string updated.
Example
import dream_http_client/client
client.new()
|> client.path("/api/users")
|> client.query("page=1&limit=10")
pub fn recorder(
client_request: ClientRequest,
recorder_value: recorder.Recorder,
) -> ClientRequest
Set the recorder for the request
Attaches a recorder to the request for recording or playback.
The recorder must be started with recorder.start() before use.
Parameters
client_request: The request to modifyrecorder_value: The recorder to attach
Returns
A new ClientRequest with the recorder attached.
Example
import dream_http_client/client
import dream_http_client/client.{host, recorder}
import dream_http_client/recorder.{directory, mode, start}
let assert Ok(rec) =
recorder.new()
|> directory("mocks")
|> mode("record")
|> start()
client.new() |> host("api.example.com") |> recorder(rec)
pub fn scheme(
client_request: ClientRequest,
scheme_value: http.Scheme,
) -> ClientRequest
Set the scheme (protocol) for the request
Configures whether to use HTTP or HTTPS. Defaults to HTTPS for security.
Parameters
client_request: The request to modifyscheme_value: The protocol scheme (http.Httporhttp.Https)
Returns
A new ClientRequest with the scheme updated.
Example
import dream_http_client/client
import gleam/http
client.new()
|> client.scheme(http.Http) // Use HTTP instead of HTTPS
pub fn send(
client_request: ClientRequest,
) -> Result(String, String)
Make a blocking HTTP request and get the complete response
Sends an HTTP request and collects all response chunks, returning the complete response body as a string. This is ideal for:
- JSON API responses
- Small files or documents
- Any case where you need the full response before processing
For large responses or when you need non-blocking streaming, use
stream_yielder() or start_stream() instead.
Parameters
client_request: The configured HTTP request
Returns
Ok(String): The complete response body as a stringError(String): An error message if the request failed
Example
import dream_http_client/client.{host, path, add_header, send}
import gleam/json.{decode}
let result = client.new()
|> host("api.example.com")
|> path("/users/123")
|> add_header("Authorization", "Bearer " <> token)
|> send()
case result {
Ok(body) -> {
case decode(body, user_decoder) {
Ok(user) -> Ok(user)
Error(json_error) ->
Error("Invalid JSON response: " <> string.inspect(json_error))
}
}
Error(error_message) -> Error("Request failed: " <> error_message)
}
pub fn start_stream(
request: ClientRequest,
) -> Result(StreamHandle, String)
Start an HTTP stream with callback handlers
Spawns a dedicated process to handle HTTP streaming and calls your callbacks as messages arrive. This is the recommended API for streaming.
Returns a StreamHandle immediately (non-blocking). The stream runs in a
separate process, and your callbacks execute in that process.
Parameters
request: The configured HTTP request with callbacks set via builder pattern
Returns
Ok(StreamHandle): Stream started successfullyError(String): Failed to start stream
Example
let assert Ok(stream) = client.new()
|> client.host("api.openai.com")
|> client.path("/v1/chat/completions")
|> client.on_stream_chunk(fn(data) {
case bit_array.to_string(data) {
Ok(text) -> io.print(text)
Error(_) -> Nil
}
})
|> client.on_stream_error(fn(reason) {
io.println_error("Error: " <> reason)
})
|> client.start_stream()
// Later: cancel if needed
client.cancel_stream_handle(stream)
pub fn stream_yielder(
client_request: ClientRequest,
) -> yielder.Yielder(Result(bytes_tree.BytesTree, String))
Stream HTTP response chunks using a yielder
Sends an HTTP request and returns a yielder that produces chunks of the response body as they arrive from the server. This allows you to process large responses incrementally without loading the entire response into memory.
Use this for simple sequential streaming:
- AI/LLM inference endpoints (stream tokens)
- Simple file downloads
- Scripts or one-off operations
For OTP actors with concurrency, use start_stream() instead.
Error Semantics
The yielder produces Result(BytesTree, String) for each chunk:
Ok(chunk)- Successful chunk, more may followError(reason)- Terminal error, stream is done
After an Error, the yielder immediately returns Done on the next call.
This design reflects that HTTP stream errors (timeouts, connection drops,
etc.) are not recoverable - you cannot continue reading from a broken stream.
Normal stream completion: When the stream finishes successfully, the yielder
returns Done (no more items). The stream does NOT yield an error for normal completion.
Possible error reasons (actual errors only):
"timeout"- Request timed out- Connection errors from
httpc
Parameters
client_request: The configured HTTP request
Returns
A Yielder that produces Result(BytesTree, String). Always check each
result - errors are terminal and mean the stream has ended.
Examples
Streaming and processing chunks as they arrive:
import dream_http_client/client.{host, path, stream_yielder}
import gleam/yielder.{each}
import gleam/bytes_tree.{to_string}
import gleam/io.{print, println_error}
client.new()
|> host("api.openai.com")
|> path("/v1/chat/completions")
|> stream_yielder()
|> each(fn(result) {
case result {
Ok(chunk) -> print(to_string(chunk))
Error(error_reason) -> {
println_error("Stream error: " <> error_reason)
// Stream is now done, no more chunks will arrive
}
}
})
Collecting all chunks into a list:
import dream_http_client/client.{host, path, stream_yielder}
import gleam/yielder
import gleam/list
import gleam/bytes_tree
import gleam/string
// The stream automatically completes when done - no need to use take()!
let chunks =
client.new()
|> host("example.com")
|> path("/data")
|> stream_yielder()
|> yielder.to_list()
// Handle results
case list.try_map(chunks, fn(result) { result }) {
Ok(chunk_list) -> {
// Concatenate all chunks
let body =
chunk_list
|> list.map(bytes_tree.to_string)
|> list.map(fn(chunk_result) { result.unwrap(chunk_result, "") })
|> string.join("")
Ok(body)
}
Error(error_reason) -> Error("Stream failed: " <> error_reason)
}
pub fn timeout(
client_request: ClientRequest,
timeout_ms: Int,
) -> ClientRequest
Set the timeout for the request in milliseconds
Sets how long to wait for a response before timing out. If not set, defaults to 30000ms (30 seconds).
Parameters
timeout_ms: Timeout duration in milliseconds
Example
import dream_http_client/client.{host, timeout}
client.new()
|> host("slow-api.example.com")
|> timeout(60_000) // 60 second timeout