dream_http_client/client

Type-safe HTTP client with recording + streaming support

Gleam doesn’t ship with an HTTPS client, so this module wraps Erlang’s battle‑hardened httpc and adds a friendly builder API, streaming helpers, and optional record/playback via dream_http_client/recorder.

Quick Example — blocking request

import dream_http_client/client.{add_header, host, path, send}

pub fn call_api(token: String) -> Result(String, String) {
  client.new()
  |> host("api.example.com")
  |> path("/users/123")
  |> add_header("Authorization", "Bearer " <> token)
  |> send()
}

Execution modes

You can execute the same ClientRequest in three ways:

The “right” choice is mostly about concurrency:

Recording and playback

Attach a recorder with recorder() to record real HTTP traffic to disk, or to play back recordings without network calls.

import dream_http_client/client.{host, path, recorder, send}
import dream_http_client/recorder.{directory, mode, start}

let assert Ok(rec) =
  recorder.new()
  |> directory("mocks/api")
  |> mode("record")
  |> start()

let assert Ok(body) =
  client.new()
  |> host("api.example.com")
  |> path("/users/123")
  |> recorder(rec)
  |> send()

Inspecting requests

ClientRequest is opaque to keep the public API stable; use the get_* functions for logging/testing.

import dream_http_client/client.{get_host, get_path, host, path}
import gleam/io

let req = client.new() |> host("api.example.com") |> path("/users/123")
io.println("Calling: " <> get_host(req) <> get_path(req))

Types

HTTP client request configuration

Represents a complete HTTP request with all its components. Use the builder pattern with functions like host(), path(), method(), etc. to configure the request, then execute it with send(), stream_yielder(), or start_stream() depending on whether you want a blocking, pull-streaming, or callback-streaming API.

Fields

  • method: The HTTP method (GET, POST, etc.)
  • scheme: The protocol (HTTP or HTTPS)
  • host: The server hostname
  • port: Optional port number (defaults to 80 for HTTP, 443 for HTTPS)
  • path: The request path (e.g., “/api/users”)
  • query: Optional query string (e.g., “?page=1&limit=10”)
  • headers: List of header name-value pairs
  • body: The request body as a string
  • timeout: Optional timeout in milliseconds (defaults to 30000ms)
  • recorder: Optional recorder for request/response recording and playback

The type is opaque to ensure API stability. Use new with builder functions to construct requests, and the getter functions to inspect request properties.

pub opaque type ClientRequest

HTTP header

Represents a single HTTP header with a name and value. Used throughout the module for type-safe header handling.

Fields

  • name: Header name (e.g., “Content-Type”, “Authorization”)
  • value: Header value (e.g., “application/json”, “Bearer token”)

Usage

Headers are constructed automatically by builder functions like add_header(), but you’ll work with this type when inspecting headers:

let headers = client.get_headers(request)
case headers {
  [Header(name, value), ..] -> {
    io.println(name <> ": " <> value)
  }
  [] -> io.println("No headers")
}

Notes

  • Header names are case-sensitive as stored, but HTTP treats them case-insensitively
  • Duplicate header names are allowed (e.g., multiple Set-Cookie headers)
  • Headers are stored in the order they were added
pub type Header {
  Header(name: String, value: String)
}

Constructors

  • Header(name: String, value: String)

Opaque request identifier for internal streaming

A unique identifier for an active HTTP stream. You will usually only see a RequestId inside StreamMessage values processed by the stream process created by start_stream(), or when using the low-level cancel_stream().

Usage

Most users never need to construct or store RequestId. Prefer controlling streams via StreamHandle (cancel_stream_handle, is_stream_active, await_stream).

Examples

// RequestIds are returned by internal streaming machinery.
// Prefer StreamHandle for user code.

Notes

  • RequestId values are opaque - do not rely on their internal structure
  • RequestIds are unique per VM instance but not stable across restarts
  • Use pattern matching or equality comparison to identify streams
pub opaque type RequestId

Handle to a running HTTP stream

Opaque handle returned from start_stream() representing a stream running in a dedicated BEAM process. Use this handle to control the stream lifecycle.

Lifecycle Management

  • await_stream(handle) - Wait for stream to complete
  • cancel_stream_handle(handle) - Stop the stream early
  • is_stream_active(handle) - Check if still running

Process Isolation

Each stream runs in its own BEAM process, which means:

  • Multiple streams run concurrently without blocking
  • Stream crashes don’t affect your application
  • Your process mailbox stays clean (HTTP messages go to stream process)
  • Callbacks execute in the stream process, not your process

Example

// Start stream
let assert Ok(stream) = client.start_stream(request)

// Check status
case client.is_stream_active(stream) {
  True -> io.println("Still streaming...")
  False -> io.println("Completed")
}

// Wait for completion
client.await_stream(stream)

// Or cancel early
client.cancel_stream_handle(stream)
pub opaque type StreamHandle

Stream message types emitted by internal streaming machinery

start_stream() runs a stream loop in a dedicated process; that process receives and decodes httpc messages into these variants.

Message Flow

  1. StreamStart - Headers received, body chunks coming
  2. Chunk - Zero or more data chunks
  3. StreamEnd or StreamError - Stream completed normally
  4. DecodeError - FFI layer corruption (rare, should be reported as a bug)

DecodeError

DecodeError indicates the Erlang→Gleam FFI boundary received a malformed message from httpc. This is not a normal HTTP error - it means either:

  • Erlang/OTP version incompatibility with this library
  • Memory corruption or other serious runtime issue
  • A bug in this library’s FFI code

What to do: If you see a DecodeError, please report it as a bug at https://github.com/TrustBound/dream/issues with the full error message. The error message includes debug information to help diagnose the issue.

Unlike StreamError which has a RequestId, DecodeError does not because the request ID itself could not be decoded from the corrupted message.

pub type StreamMessage {
  StreamStart(request_id: RequestId, headers: List(Header))
  Chunk(request_id: RequestId, data: BitArray)
  StreamEnd(request_id: RequestId, headers: List(Header))
  StreamError(request_id: RequestId, reason: String)
  DecodeError(reason: String)
}

Constructors

  • StreamStart(request_id: RequestId, headers: List(Header))

    Stream started, headers received

  • Chunk(request_id: RequestId, data: BitArray)

    Data chunk received

  • StreamEnd(request_id: RequestId, headers: List(Header))

    Stream completed successfully

  • StreamError(request_id: RequestId, reason: String)

    Stream failed with error (connection drop, timeout, HTTP error, etc.)

  • DecodeError(reason: String)

    Failed to decode stream message from Erlang FFI (indicates library bug)

Values

pub fn add_header(
  client_request: ClientRequest,
  name: String,
  value: String,
) -> ClientRequest

Add a header to the request

Adds a single header to the existing headers list without replacing them. The new header is prepended to the list, so it will take precedence if there’s a duplicate header name.

Parameters

  • client_request: The request to modify
  • name: The header name (e.g., “Authorization”, “Content-Type”)
  • value: The header value

Returns

A new ClientRequest with the header added.

Example

import dream_http_client/client

client.new()
|> client.add_header("Authorization", "Bearer " <> token)
|> client.add_header("Content-Type", "application/json")
pub fn await_stream(handle: StreamHandle) -> Nil

Wait for a stream to complete

Blocks until the stream process exits. Use this when you need to wait for the stream to finish before continuing.

Returns Ok(Nil) when stream completes.

For timeout behavior, use cancel_stream_handle() with a timer, or implement your own timeout logic.

Example

let assert Ok(stream) = client.start_stream(request)
client.await_stream(stream)
io.println("Stream finished")
pub fn body(
  client_request: ClientRequest,
  body_value: String,
) -> ClientRequest

Set the body for the request

Sets the request body as a string. Typically used for POST, PUT, and PATCH requests. For JSON, serialize your data first.

Parameters

  • client_request: The request to modify
  • body_value: The request body as a string

Returns

A new ClientRequest with the body updated.

Example

import dream_http_client/client
import gleam/json

let json_body = json.object([
  #("name", json.string("Alice")),
  #("email", json.string("alice@example.com")),
])

client.new()
|> client.method(http.Post)
|> client.body(json.to_string(json_body))
pub fn cancel_stream(request_id: RequestId) -> Nil

Cancel an active streaming request (low-level API)

Cancels an HTTP stream given its RequestId.

Note: Most users should use start_stream() and cancel_stream_handle() instead. cancel_stream() exists primarily to support internal stream machinery and advanced integrations.

Parameters

  • request_id: The request ID for an active internal stream

Example

This is typically not called directly unless you already have a RequestId.

pub fn cancel_stream_handle(handle: StreamHandle) -> Nil

Cancel a stream started with start_stream()

Stops the stream process and cancels the underlying HTTP request. Safe to call multiple times on the same handle.

Example

let assert Ok(stream) = client.start_stream(request)
// Later:
client.cancel_stream_handle(stream)
pub fn get_body(client_request: ClientRequest) -> String

Get the body from a request

Returns the request body as a string.

Example

import dream_http_client/client

let req = client.new() |> client.body("{\"name\": \"Alice\"}")
let body = client.get_body(req)
// body == "{\"name\": \"Alice\"}"
pub fn get_headers(client_request: ClientRequest) -> List(Header)

Get the headers from a request

Returns the list of headers configured for the request.

Example

import dream_http_client/client

let req = client.new()
  |> client.add_header("Authorization", "Bearer token")
  |> client.add_header("Content-Type", "application/json")
let headers = client.get_headers(req)
// headers == [Header("Content-Type", "application/json"), Header("Authorization", "Bearer token")]
pub fn get_host(client_request: ClientRequest) -> String

Get the host from a request

Returns the hostname configured for the request.

Example

import dream_http_client/client

let req = client.new() |> client.host("api.example.com")
let host = client.get_host(req)
// host == "api.example.com"
pub fn get_method(client_request: ClientRequest) -> http.Method

Get the HTTP method from a request

Returns the HTTP method (GET, POST, etc.) configured for the request.

Example

import dream_http_client/client
import gleam/http.{Post}

let req = client.new() |> client.method(Post)
let method = client.get_method(req)
// method == Post
pub fn get_path(client_request: ClientRequest) -> String

Get the path from a request

Returns the request path configured for the request.

Example

import dream_http_client/client

let req = client.new() |> client.path("/api/users")
let path = client.get_path(req)
// path == "/api/users"
pub fn get_port(
  client_request: ClientRequest,
) -> option.Option(Int)

Get the port from a request

Returns the optional port number configured for the request. If None, the default port for the scheme will be used (80 for HTTP, 443 for HTTPS).

Example

import dream_http_client/client

let req = client.new() |> client.port(8080)
let port = client.get_port(req)
// port == Some(8080)
pub fn get_query(
  client_request: ClientRequest,
) -> option.Option(String)

Get the query string from a request

Returns the optional query string configured for the request.

Example

import dream_http_client/client

let req = client.new() |> client.query("page=1&limit=10")
let query = client.get_query(req)
// query == Some("page=1&limit=10")
pub fn get_recorder(
  client_request: ClientRequest,
) -> option.Option(recorder.Recorder)

Get the recorder from a request

Returns the optional recorder attached to the request for recording or playback.

Example

import dream_http_client/client
import dream_http_client/recorder.{directory, mode, start}

let assert Ok(rec) =
  recorder.new()
  |> directory("mocks")
  |> mode("record")
  |> start()
let req = client.new() |> client.recorder(rec)
let recorder_opt = client.get_recorder(req)
// recorder_opt == Some(rec)
pub fn get_scheme(client_request: ClientRequest) -> http.Scheme

Get the URI scheme from a request

Returns the scheme (HTTP or HTTPS) configured for the request.

Example

import dream_http_client/client
import gleam/http.{Http}

let req = client.new() |> client.scheme(Http)
let scheme = client.get_scheme(req)
// scheme == Http
pub fn get_timeout(
  client_request: ClientRequest,
) -> option.Option(Int)

Get the timeout from a request

Returns the optional timeout in milliseconds configured for the request. If None, the default timeout (30000ms) will be used.

Example

import dream_http_client/client

let req = client.new() |> client.timeout(5000)
let timeout = client.get_timeout(req)
// timeout == Some(5000)
pub fn headers(
  client_request: ClientRequest,
  headers_value: List(Header),
) -> ClientRequest

Set the headers for the request

Replaces all existing headers with the provided list. Use add_header() to add a single header without replacing existing ones.

Parameters

  • client_request: The request to modify
  • headers_value: List of header tuples #(name, value)

Returns

A new ClientRequest with headers replaced.

Example

import dream_http_client/client

client.new()
|> client.headers([
  #("Authorization", "Bearer " <> token),
  #("Content-Type", "application/json"),
])
pub fn host(
  client_request: ClientRequest,
  host_value: String,
) -> ClientRequest

Set the host for the request

Sets the server hostname or IP address. This is required for all requests.

Parameters

  • client_request: The request to modify
  • host_value: The hostname (e.g., “api.example.com” or “192.168.1.1”)

Returns

A new ClientRequest with the host updated.

Example

import dream_http_client/client

client.new()
|> client.host("api.example.com")
pub fn is_stream_active(handle: StreamHandle) -> Bool

Check if a stream is still active

Returns True if the stream process is still running, False otherwise.

Example

let assert Ok(stream) = client.start_stream(request)
case client.is_stream_active(stream) {
  True -> io.println("Stream still running")
  False -> io.println("Stream completed")
}
pub fn method(
  client_request: ClientRequest,
  method_value: http.Method,
) -> ClientRequest

Set the HTTP method for the request

Configures the HTTP method (GET, POST, PUT, DELETE, etc.) for the request.

Parameters

  • client_request: The request to modify
  • method_value: The HTTP method to use

Returns

A new ClientRequest with the method updated.

Example

import dream_http_client/client
import gleam/http

client.new()
|> client.method(http.Post)
pub fn new() -> ClientRequest

Default client request configuration

Creates a new ClientRequest with sensible defaults:

  • Method: GET
  • Scheme: HTTPS
  • Host: “localhost”
  • Port: None (uses default for scheme)
  • Path: “” (empty)
  • Query: None
  • Headers: [] (empty)
  • Body: “” (empty)
  • Timeout: None (uses default 30000ms)

Use this as the starting point for building requests with the builder pattern.

Example

import dream_http_client/client.{host, method, new, path}
import gleam/http.{Get}

new()
|> host("api.example.com")
|> path("/users/123")
|> method(Get)
pub fn on_stream_chunk(
  client_request: ClientRequest,
  callback: fn(BitArray) -> Nil,
) -> ClientRequest

Set callback for stream chunk event

Sets a function to be called for each data chunk received from the stream. This is where you process the actual response data.

Parameters

  • client_request: The request to modify
  • callback: Function called with each chunk of data

Example

client.new()
|> client.host("api.openai.com")
|> client.on_stream_chunk(fn(data) {
  let text = bytes_tree.from_bit_array(data) |> bytes_tree.to_string
  io.print(text)
})
|> client.start_stream()
pub fn on_stream_end(
  client_request: ClientRequest,
  callback: fn(List(Header)) -> Nil,
) -> ClientRequest

Set callback for stream end event

Sets a function to be called when a stream completes successfully. Optional - if not set, stream completion is ignored.

Parameters

  • client_request: The request to modify
  • callback: Function called with trailing headers when stream completes

Example

client.new()
|> client.host("api.example.com")
|> client.on_stream_end(fn(_headers) {
  io.println("Stream completed")
})
|> client.start_stream()
pub fn on_stream_error(
  client_request: ClientRequest,
  callback: fn(String) -> Nil,
) -> ClientRequest

Set callback for stream error event

Sets a function to be called if the stream fails with an error. Handles both HTTP errors and network errors.

Parameters

  • client_request: The request to modify
  • callback: Function called with error reason if stream fails

Example

client.new()
|> client.host("api.example.com")
|> client.on_stream_error(fn(reason) {
  io.println_error("Stream failed: " <> reason)
})
|> client.start_stream()
pub fn on_stream_start(
  client_request: ClientRequest,
  callback: fn(List(Header)) -> Nil,
) -> ClientRequest

Set callback for stream start event

Sets a function to be called when a stream starts and headers are received. Optional - if not set, stream start is ignored.

Parameters

  • client_request: The request to modify
  • callback: Function called with response headers when stream starts

Example

client.new()
|> client.host("api.example.com")
|> client.on_stream_start(fn(headers) {
  io.println("Stream started with " <> int.to_string(list.length(headers)) <> " headers")
})
|> client.start_stream()
pub fn path(
  client_request: ClientRequest,
  path_value: String,
) -> ClientRequest

Set the path for the request

Sets the request path. Should start with “/” for absolute paths.

Parameters

  • client_request: The request to modify
  • path_value: The path (e.g., “/api/users” or “/api/users/123”)

Returns

A new ClientRequest with the path updated.

Example

import dream_http_client/client

client.new()
|> client.path("/api/users/123")
pub fn port(
  client_request: ClientRequest,
  port_value: Int,
) -> ClientRequest

Set the port for the request

Sets a custom port number. If not set, defaults to 80 for HTTP and 443 for HTTPS. Only set this if you’re using a non-standard port.

Parameters

  • client_request: The request to modify
  • port_value: The port number (e.g., 8080, 3000)

Returns

A new ClientRequest with the port updated.

Example

import dream_http_client/client

client.new()
|> client.host("localhost")
|> client.port(3000)  // Use port 3000 instead of default
pub fn query(
  client_request: ClientRequest,
  query_value: String,
) -> ClientRequest

Set the query string for the request

Sets the query string portion of the URL. Do not include the leading “?”.

Parameters

  • client_request: The request to modify
  • query_value: The query string (e.g., “page=1&limit=10”)

Returns

A new ClientRequest with the query string updated.

Example

import dream_http_client/client

client.new()
|> client.path("/api/users")
|> client.query("page=1&limit=10")
pub fn recorder(
  client_request: ClientRequest,
  recorder_value: recorder.Recorder,
) -> ClientRequest

Set the recorder for the request

Attaches a recorder to the request for recording or playback. The recorder must be started with recorder.start() before use.

Parameters

  • client_request: The request to modify
  • recorder_value: The recorder to attach

Returns

A new ClientRequest with the recorder attached.

Example

import dream_http_client/client
import dream_http_client/client.{host, recorder}
import dream_http_client/recorder.{directory, mode, start}

let assert Ok(rec) =
  recorder.new()
  |> directory("mocks")
  |> mode("record")
  |> start()

client.new() |> host("api.example.com") |> recorder(rec)
pub fn scheme(
  client_request: ClientRequest,
  scheme_value: http.Scheme,
) -> ClientRequest

Set the scheme (protocol) for the request

Configures whether to use HTTP or HTTPS. Defaults to HTTPS for security.

Parameters

  • client_request: The request to modify
  • scheme_value: The protocol scheme (http.Http or http.Https)

Returns

A new ClientRequest with the scheme updated.

Example

import dream_http_client/client
import gleam/http

client.new()
|> client.scheme(http.Http)  // Use HTTP instead of HTTPS
pub fn send(
  client_request: ClientRequest,
) -> Result(String, String)

Make a blocking HTTP request and get the complete response

Sends an HTTP request and collects all response chunks, returning the complete response body as a string. This is ideal for:

  • JSON API responses
  • Small files or documents
  • Any case where you need the full response before processing

For large responses or when you need non-blocking streaming, use stream_yielder() or start_stream() instead.

Parameters

  • client_request: The configured HTTP request

Returns

  • Ok(String): The complete response body as a string
  • Error(String): An error message if the request failed

Example

import dream_http_client/client.{host, path, add_header, send}
import gleam/json.{decode}

let result = client.new()
  |> host("api.example.com")
  |> path("/users/123")
  |> add_header("Authorization", "Bearer " <> token)
  |> send()

case result {
  Ok(body) -> {
    case decode(body, user_decoder) {
      Ok(user) -> Ok(user)
      Error(json_error) ->
        Error("Invalid JSON response: " <> string.inspect(json_error))
    }
  }
  Error(error_message) -> Error("Request failed: " <> error_message)
}
pub fn start_stream(
  request: ClientRequest,
) -> Result(StreamHandle, String)

Start an HTTP stream with callback handlers

Spawns a dedicated process to handle HTTP streaming and calls your callbacks as messages arrive. This is the recommended API for streaming.

Returns a StreamHandle immediately (non-blocking). The stream runs in a separate process, and your callbacks execute in that process.

Parameters

  • request: The configured HTTP request with callbacks set via builder pattern

Returns

  • Ok(StreamHandle): Stream started successfully
  • Error(String): Failed to start stream

Example

let assert Ok(stream) = client.new()
  |> client.host("api.openai.com")
  |> client.path("/v1/chat/completions")
  |> client.on_stream_chunk(fn(data) {
    case bit_array.to_string(data) {
      Ok(text) -> io.print(text)
      Error(_) -> Nil
    }
  })
  |> client.on_stream_error(fn(reason) {
    io.println_error("Error: " <> reason)
  })
  |> client.start_stream()

// Later: cancel if needed
client.cancel_stream_handle(stream)
pub fn stream_yielder(
  client_request: ClientRequest,
) -> yielder.Yielder(Result(bytes_tree.BytesTree, String))

Stream HTTP response chunks using a yielder

Sends an HTTP request and returns a yielder that produces chunks of the response body as they arrive from the server. This allows you to process large responses incrementally without loading the entire response into memory.

Use this for simple sequential streaming:

  • AI/LLM inference endpoints (stream tokens)
  • Simple file downloads
  • Scripts or one-off operations

For OTP actors with concurrency, use start_stream() instead.

Error Semantics

The yielder produces Result(BytesTree, String) for each chunk:

  • Ok(chunk) - Successful chunk, more may follow
  • Error(reason) - Terminal error, stream is done

After an Error, the yielder immediately returns Done on the next call. This design reflects that HTTP stream errors (timeouts, connection drops, etc.) are not recoverable - you cannot continue reading from a broken stream.

Normal stream completion: When the stream finishes successfully, the yielder returns Done (no more items). The stream does NOT yield an error for normal completion.

Possible error reasons (actual errors only):

  • "timeout" - Request timed out
  • Connection errors from httpc

Parameters

  • client_request: The configured HTTP request

Returns

A Yielder that produces Result(BytesTree, String). Always check each result - errors are terminal and mean the stream has ended.

Examples

Streaming and processing chunks as they arrive:

import dream_http_client/client.{host, path, stream_yielder}
import gleam/yielder.{each}
import gleam/bytes_tree.{to_string}
import gleam/io.{print, println_error}

client.new()
  |> host("api.openai.com")
  |> path("/v1/chat/completions")
  |> stream_yielder()
  |> each(fn(result) {
    case result {
      Ok(chunk) -> print(to_string(chunk))
      Error(error_reason) -> {
        println_error("Stream error: " <> error_reason)
        // Stream is now done, no more chunks will arrive
      }
    }
  })

Collecting all chunks into a list:

import dream_http_client/client.{host, path, stream_yielder}
import gleam/yielder
import gleam/list
import gleam/bytes_tree
import gleam/string

// The stream automatically completes when done - no need to use take()!
let chunks = 
  client.new()
  |> host("example.com")
  |> path("/data")
  |> stream_yielder()
  |> yielder.to_list()

// Handle results
case list.try_map(chunks, fn(result) { result }) {
  Ok(chunk_list) -> {
    // Concatenate all chunks
    let body = 
      chunk_list
      |> list.map(bytes_tree.to_string)
      |> list.map(fn(chunk_result) { result.unwrap(chunk_result, "") })
      |> string.join("")
    Ok(body)
  }
  Error(error_reason) -> Error("Stream failed: " <> error_reason)
}
pub fn timeout(
  client_request: ClientRequest,
  timeout_ms: Int,
) -> ClientRequest

Set the timeout for the request in milliseconds

Sets how long to wait for a response before timing out. If not set, defaults to 30000ms (30 seconds).

Parameters

  • timeout_ms: Timeout duration in milliseconds

Example

import dream_http_client/client.{host, timeout}

client.new()
|> host("slow-api.example.com")
|> timeout(60_000)  // 60 second timeout
Search Document