dream_http_client
Type-safe HTTP client for Gleam with streaming support.
A standalone HTTP/HTTPS client built on Erlang’s battle-tested httpc. Supports both streaming and non-streaming requests. Built with the same quality standards as Dream, but completely independent—use it in any Gleam project.
Features
- ✅ Three execution modes: Blocking, yielder streaming, and message-based streaming
- ✅ OTP-compatible: Message-based streams work with actors and selectors
- ✅ Concurrent streams: Handle multiple HTTP streams in a single actor
- ✅ Stream cancellation: Cancel in-flight requests cleanly
- ✅ HTTPS support: Full TLS/SSL support via Erlang
- ✅ Builder pattern: Consistent, composable request configuration
- ✅ Type-safe:
Resulttypes force error handling - ✅ Battle-tested: Erlang’s
httpcunder the hood - ✅ Framework-independent: Zero dependencies on Dream or other frameworks
Installation
gleam add dream_http_client
Quick Start
Choose the execution mode that fits your use case:
1. Blocking Requests - client.send()
Perfect for JSON APIs and small responses:
import dream_http_client/client.{method, scheme, host, path, add_header, send}
import gleam/http.{Get, Https}
let result = client.new
|> method(Get)
|> scheme(Https)
|> host("api.example.com")
|> path("/users/123")
|> add_header("Authorization", "Bearer " <> token)
|> send()
case result {
Ok(body) -> decode_json(body)
Error(msg) -> handle_error(msg)
}
2. Yielder-Based Streaming - client.stream_yielder()
Perfect for AI responses or simple sequential streaming:
import dream_http_client/client.{host, path, stream_yielder}
import gleam/yielder.{each}
import gleam/bytes_tree.{to_string}
import gleam/io.{print}
client.new
|> host("api.openai.com")
|> path("/v1/chat/completions")
|> stream_yielder()
|> each(fn(chunk) {
// Process each chunk as it arrives
print(to_string(chunk))
})
Note: This is a pull-based synchronous API. It blocks while waiting for chunks, so it’s not suitable for OTP actors that need to handle multiple concurrent operations.
3. Message-Based Streaming - client.stream_messages()
Perfect for OTP actors handling multiple concurrent streams:
import dream_http_client/client.{
type RequestId, type StreamMessage, StreamStart, Chunk, StreamEnd, StreamError,
host, path, stream_messages, select_stream_messages, cancel_stream
}
import gleam/otp/actor.{continue}
import gleam/erlang/process.{type Selector, new_selector}
import gleam/dict.{type Dict}
pub type Message {
HttpStream(StreamMessage)
OtherMessage(String)
}
pub type State {
State(active_streams: Dict(RequestId, StreamState))
}
fn handle_message(msg: Message, state: State) {
case msg {
HttpStream(stream_msg) -> {
case stream_msg {
StreamStart(req_id, headers) -> {
// Stream started, headers received
let new_state = track_stream(state, req_id, headers)
continue(new_state)
}
Chunk(req_id, data) -> {
// Data chunk received, process it
let new_state = process_chunk(state, req_id, data)
continue(new_state)
}
StreamEnd(req_id, trailing_headers) -> {
// Stream completed successfully
let new_state = cleanup_stream(state, req_id)
continue(new_state)
}
StreamError(req_id, reason) -> {
// Stream failed, handle error
let new_state = handle_stream_error(state, req_id, reason)
continue(new_state)
}
}
}
OtherMessage(content) -> {
// Handle other actor messages
continue(state)
}
}
}
fn init_selector() -> Selector(Message) {
new_selector()
|> select_stream_messages(HttpStream)
// Can add more selectors for other message types
}
// Start multiple concurrent streams
pub fn start_streams() {
let assert Ok(req_id_1) = client.new
|> host("api.example.com")
|> path("/stream/1")
|> stream_messages()
let assert Ok(req_id_2) = client.new
|> host("api.example.com")
|> path("/stream/2")
|> stream_messages()
// Both streams send messages to your actor concurrently
// RequestId discriminates between them
}
// Cancel a stream if needed
pub fn cancel_if_needed(req_id: RequestId) {
cancel_stream(req_id)
// No more messages will arrive for this stream
}
Choosing an Execution Mode
| Use Case | Recommended Mode | Why |
|---|---|---|
| JSON API calls | send() | Simple, complete response at once |
| Small file downloads | send() | Load entire file into memory |
| AI/LLM streaming (single request) | stream_yielder() | Sequential token processing |
| Simple file downloads | stream_yielder() | Memory-efficient chunked processing |
| OTP actors with multiple streams | stream_messages() | Non-blocking, concurrent, cancellable |
| Long-lived connections | stream_messages() | Can cancel mid-stream |
| Integration with supervisors | stream_messages() | Full OTP compatibility |
Usage
Building Requests
All three execution modes use the same builder pattern:
import dream_http_client/client.{
method, scheme, host, port, path, query, add_header, body, timeout
}
import gleam/http.{Post, Https}
let request = client.new
|> method(Post)
|> scheme(Https)
|> host("api.example.com")
|> port(443)
|> path("/api/users")
|> query("page=1&limit=10")
|> add_header("Content-Type", "application/json")
|> add_header("Authorization", "Bearer " <> token)
|> body(json_body)
|> timeout(60_000) // 60 second timeout for slow APIs
Blocking Requests - send()
Get the complete response body at once:
import dream_http_client/client.{send}
import gleam/json.{decode}
case send(request) {
Ok(body) -> {
// Process complete response
case decode(body, user_decoder) {
Ok(user) -> Ok(user)
Error(_) -> Error("Invalid JSON")
}
}
Error(msg) -> Error("Request failed: " <> msg)
}
Yielder Streaming - stream_yielder()
Process chunks sequentially as they arrive:
import dream_http_client/client.{stream_yielder}
import gleam/yielder.{each, to_list}
import gleam/bytes_tree.{to_string}
import gleam/io.{println}
// Process chunks incrementally
stream_yielder(request)
|> each(fn(chunk) {
let text = to_string(chunk)
println("Received: " <> text)
})
// Or collect all chunks (stream completes automatically)
let chunks = stream_yielder(request)
|> to_list() // No need to use take() - stream stops when done!
Important: This blocks the calling process while waiting for chunks. Don’t use this in OTP actors that need to handle other messages concurrently.
Message-Based Streaming - stream_messages()
Handle streams in OTP actors with full concurrency:
import dream_http_client/client.{stream_messages, select_stream_messages, cancel_stream}
import gleam/erlang/process.{new_selector}
// 1. Start a stream - returns immediately with RequestId
let assert Ok(req_id) = stream_messages(request)
// 2. Messages arrive in your process mailbox automatically:
// - StreamStart(req_id, headers)
// - Chunk(req_id, data) (zero or more)
// - StreamEnd(req_id, headers) or StreamError(req_id, reason)
// 3. Use select_stream_messages to integrate with your selector
let selector = new_selector()
|> select_stream_messages(MyHttpMessage)
// 4. Cancel if needed
cancel_stream(req_id)
Complete OTP Actor Example
import dream_http_client/client.{
type StreamMessage, StreamStart, Chunk, StreamEnd, StreamError,
host, stream_messages, select_stream_messages
}
import gleam/otp/actor.{type Next, type Spec, Spec, Ready, Stop, continue, start_spec}
import gleam/erlang/process.{type Selector, Normal, new_selector, selecting}
import gleam/dict.{type Dict, new as new_dict, insert, get, delete}
import gleam/list.{reverse}
import gleam/bit_array.{concat}
import gleam/string.{inspect}
import gleam/io.{println_error}
pub type Message {
StartDownload(url: String)
HttpStream(StreamMessage)
Stop
}
pub type StreamState {
StreamState(chunks: List(BitArray))
}
pub type State {
State(streams: Dict(String, StreamState))
}
pub fn start() {
start_spec(Spec(
init: fn() {
let selector = new_selector()
|> select_stream_messages(HttpStream)
|> selecting(some_subject, fn(msg) { msg })
Ready(State(streams: new_dict()), selector)
},
init_timeout: 1000,
loop: handle_message,
))
}
fn handle_message(msg: Message, state: State) -> Next(Message, State) {
case msg {
StartDownload(url) -> {
// Start a new stream
let assert Ok(req_id) = client.new
|> host(url)
|> stream_messages()
continue(state)
}
HttpStream(StreamStart(req_id, headers)) -> {
// Stream started, initialize state
let req_id_str = inspect(req_id)
let new_streams = insert(
state.streams,
req_id_str,
StreamState(chunks: [])
)
continue(State(streams: new_streams))
}
HttpStream(Chunk(req_id, data)) -> {
// Accumulate chunk
let req_id_str = inspect(req_id)
case get(state.streams, req_id_str) {
Ok(stream_state) -> {
let updated = StreamState(
chunks: [data, ..stream_state.chunks]
)
let new_streams = insert(state.streams, req_id_str, updated)
continue(State(streams: new_streams))
}
Error(_) -> continue(state)
}
}
HttpStream(StreamEnd(req_id, _headers)) -> {
// Stream complete, process all chunks
let req_id_str = inspect(req_id)
case get(state.streams, req_id_str) {
Ok(stream_state) -> {
// Combine all chunks and process
let complete_data = stream_state.chunks
|> reverse()
|> concat()
process_complete_download(complete_data)
// Remove from active streams
let new_streams = delete(state.streams, req_id_str)
continue(State(streams: new_streams))
}
Error(_) -> continue(state)
}
}
HttpStream(StreamError(req_id, reason)) -> {
// Handle error
println_error("Stream error: " <> reason)
let req_id_str = inspect(req_id)
let new_streams = delete(state.streams, req_id_str)
continue(State(streams: new_streams))
}
Stop -> Stop(Normal)
}
}
POST Requests with JSON
import dream_http_client/client.{method, host, path, add_header, body, send}
import gleam/http.{Post}
import gleam/json.{object, string, to_string}
let user_json = object([
#("name", string("Alice")),
#("email", string("alice@example.com")),
])
let result = client.new
|> method(Post)
|> host("api.example.com")
|> path("/users")
|> add_header("Content-Type", "application/json")
|> body(to_string(user_json))
|> send()
Configuring Timeouts
All request types support timeout configuration. Default is 30 seconds:
import dream_http_client/client.{host, path, timeout, send}
// Short timeout for quick APIs
let result = client.new
|> host("api.example.com")
|> path("/health")
|> timeout(5_000) // 5 seconds
|> send()
// Long timeout for slow operations
let result = client.new
|> host("ml-api.example.com")
|> path("/train-model")
|> timeout(300_000) // 5 minutes
|> stream_yielder() // Works with all execution modes
API Reference
Types
ClientRequest
HTTP request configuration with all components:
method: HTTP method (GET, POST, etc.)scheme: Protocol (HTTP or HTTPS)host: Server hostnameport: Optional port number (defaults to 80 for HTTP, 443 for HTTPS)path: Request pathquery: Optional query stringheaders: List of header name-value pairsbody: Request body as stringtimeout: Optional timeout in milliseconds (defaults to 30000ms)
RequestId
Opaque identifier for an active message-based stream. Returned from stream_messages() and included in all StreamMessage variants. Use this to:
- Track multiple concurrent streams
- Cancel specific streams
- Associate messages with the correct request
StreamMessage
Union type for message-based streaming. Most variants include the RequestId:
StreamStart(request_id, headers)- Stream started, initial headers receivedChunk(request_id, data)- Data chunk received (BitArray)StreamEnd(request_id, headers)- Stream completed, trailing headers receivedStreamError(request_id, reason)- Stream failed with error messageDecodeError(reason)- FFI corruption (should be reported as a bug)
About DecodeError:
DecodeError is a rare error indicating the Erlang→Gleam FFI boundary received
a malformed message from httpc. This is not a normal HTTP error - it means:
- Erlang/OTP version incompatibility with this library
- Memory corruption or other serious runtime issue
- A bug in this library’s FFI code
Unlike other variants, DecodeError does not include a RequestId because the
request ID itself could not be decoded. If you see this error, please report it
as a bug with the full error message at https://github.com/TrustBound/dream/issues
Client Configuration (Builder Pattern)
client.new- Create default request configurationclient.method(req, method)- Set HTTP method (GET, POST, etc.)client.scheme(req, scheme)- Set protocol (HTTP or HTTPS)client.host(req, host)- Set hostname (required)client.port(req, port)- Set port number (optional, defaults to 80/443)client.path(req, path)- Set request pathclient.query(req, query)- Set query stringclient.body(req, body)- Set request bodyclient.timeout(req, timeout_ms)- Set request timeout in millisecondsclient.add_header(req, name, value)- Add a single headerclient.headers(req, headers)- Replace all headers at once
Request Execution
Blocking Mode
client.send(req) -> Result(String, String)- Returns complete response body as string
- Blocks until entire response received
- Returns
Ok(body)on success orError(msg)on failure
Yielder Streaming Mode
client.stream_yielder(req) -> yielder.Yielder(Result(bytes_tree.BytesTree, String))- Returns yielder that produces
Resultvalues for each chunk Ok(chunk)- Successful chunk, more may followError(reason)- Terminal error, stream is done- Normal completion: Yielder returns
Done(no error yielded) - Pull-based: blocks while waiting for next chunk
- Use with
yielder.each(),yielder.fold(),yielder.to_list(), etc. - Not suitable for OTP actors with concurrent operations
- Returns yielder that produces
Message-Based Streaming Mode
-
client.stream_messages(req) -> Result(RequestId, String)- Starts stream and returns
RequestIdimmediately - Push-based: messages sent to your process mailbox automatically
- Non-blocking and fully OTP-compatible
- Returns
Ok(request_id)if stream started,Error(msg)if failed
- Starts stream and returns
-
client.select_stream_messages(selector, mapper) -> Selector(msg)- Integrates stream messages into an OTP selector
mapperconvertsStreamMessageto your actor’s message type- Use in
actor.Specinit function to build your selector
-
client.cancel_stream(request_id) -> Nil- Cancels an active message-based stream
- No more messages will arrive for this
RequestId - Safe to call multiple times on same ID
Error Handling
All execution modes use Result types to force explicit error handling:
Blocking and Message Start Errors
import dream_http_client/client.{send}
import gleam/io.{println_error}
case send(request) {
Ok(body) -> process_response(body)
Error(msg) -> {
// Common errors:
// - Connection refused
// - DNS resolution failed
// - Invalid URL
// - Timeout
println_error("Request failed: " <> msg)
}
}
Streaming Errors
For message-based streaming, errors arrive as StreamError messages:
import dream_http_client/client.{StreamError}
HttpStream(StreamError(req_id, reason)) -> {
// Handle mid-stream errors:
// - Network interruption
// - Server closed connection
// - Timeout
log_error(req_id, reason)
cleanup_stream(state, req_id)
}
Best Practices
- Always handle
Errorcases - Network operations can fail - Set appropriate timeouts - Use
client.timeout()to configure request timeouts (default: 30s) - Handle yielder errors -
stream_yielder()producesResultvalues, check each one - Cancel streams when done - Free resources with
cancel_stream() - Track active streams - Use a
Dict(RequestId, State)in actors - Handle StreamError and DecodeError - Network can fail mid-stream, FFI can corrupt
- Test error paths - Simulate failures in tests with slow/error endpoints
Design Principles
This module follows the same quality standards as Dream:
- No nested cases - Clear, flat control flow throughout
- No anonymous functions - All functions are named for clarity
- Builder pattern - Consistent, composable request configuration
- Type safety -
Resulttypes force error handling at compile time - OTP-first design - Message-based API designed for supervision trees
- Comprehensive testing - Unit tests (no network) + integration tests (real HTTP)
- Battle-tested foundation - Built on Erlang’s production-proven
httpc
About Dream
This module was originally built for the Dream web toolkit, but it’s completely standalone and can be used in any Gleam project. It follows Dream’s design principles and will be maintained as part of the Dream ecosystem.
License
MIT License - see LICENSE file for details.