gftp/stream
The stream module provides an interface for working with a network stream, which can either be a TCP stream or a TLS stream. It abstracts away the differences between the two types of streams and provides a unified interface for reading and writing data.
Message-based mode
For integration with OTP actors and other message-based architectures, the stream module provides
a non-blocking alternative to synchronous receive. Instead of blocking the calling process,
you can use receive_next_packet_as_message to arm the socket for a single message delivery,
then use select_stream_messages to build a process.Selector that maps incoming socket
messages to your actor’s message type.
This is the standard BEAM approach: set {active, once} on the socket, receive one message,
then re-arm. This gives backpressure control and integrates naturally with gleam_erlang
selectors and OTP actors.
Types
Data Stream used for communications. It can be both of type Tcp in case of plain communication or Ssl in case of FTPS
pub type DataStream {
Ssl(ssl: kafein.SslSocket, tcp: mug.Socket)
Tcp(socket: mug.Socket)
}
Constructors
-
Ssl(ssl: kafein.SslSocket, tcp: mug.Socket)Contains the SSL socket and the underlying TCP socket. The TCP socket is necessary in case we want to switch back to a plain TCP stream.
-
Tcp(socket: mug.Socket)
A unified message type for data received asynchronously from a DataStream.
Used with receive_next_packet_as_message and select_stream_messages for
non-blocking, message-based I/O that integrates with OTP actors.
pub type StreamMessage {
Packet(BitArray)
StreamClosed
StreamError(result.FtpError)
}
Constructors
-
Packet(BitArray)Data received from the stream.
-
StreamClosedThe remote peer closed the connection.
-
StreamError(result.FtpError)An error occurred on the stream.
Values
pub fn controlling_process(
stream: DataStream,
pid: process.Pid,
) -> Result(Nil, mug.Error)
Transfer socket ownership to a different process.
This is necessary when a data stream is created in one process (e.g. an OTP actor)
but message-based I/O (receive_next_packet_as_message) will be used from another.
The receiving process must be the socket’s controlling process to get {active, once} messages.
pub fn downgrade_to_tcp(stream: DataStream) -> DataStream
Downgrades an SSL stream to a TCP stream. If the stream is already a TCP stream, it returns it as is.
pub fn local_address(
stream: DataStream,
) -> Result(#(String, Int), mug.Error)
Retrieves the local address (IP and port) of the underlying TCP socket.
pub fn peer_address(
stream: DataStream,
) -> Result(#(String, Int), mug.Error)
Retrieves the peer address (IP and port) of the underlying TCP socket.
pub fn receive(
stream: DataStream,
timeout: Int,
) -> Result(BitArray, mug.Error)
Receive a message from the peer.
Errors if the socket is closed, if the timeout is reached, or if any other error occurs during the receive operation.
The error is returned as a [mug.Error]. In case of success, it returns the received data as a BitArray.
pub fn receive_exact(
stream: DataStream,
size: Int,
timeout: Int,
) -> Result(BitArray, mug.Error)
Receive an exact number of bytes from the peer.
Errors if the socket is closed, if the timeout is reached, or if any other error occurs during the receive operation.
The error is returned as a [mug.Error]. In case of success, it returns the received data as a BitArray.
pub fn receive_next_packet_as_message(stream: DataStream) -> Nil
Set {active, once} on the underlying socket so that the next packet is
delivered as an Erlang message to the calling process instead of being
buffered for a synchronous receive call.
After each message arrives you must call this again to receive the next one.
Use select_stream_messages to build a selector that maps the raw socket
messages into StreamMessage values.
pub fn select_stream_messages(
selector: process.Selector(t),
mapper: fn(StreamMessage) -> t,
) -> process.Selector(t)
Add handlers for both TCP and SSL socket messages to a process.Selector.
The mapper function converts a StreamMessage into your actor’s message
type. Handlers for both TCP and SSL messages are registered; only the one
matching the actual socket type will ever fire.
import gleam/erlang/process
import gftp/stream.{type StreamMessage}
type MyMessage {
GotStream(StreamMessage)
// ... other variants
}
let selector =
process.new_selector()
|> stream.select_stream_messages(GotStream)
pub fn send(
stream: DataStream,
data: BitArray,
) -> Result(Nil, mug.Error)
Sends data over the provided stream. It handles both SSL and TCP streams, abstracting away the differences between them.
Returns a Result indicating success (Nil) or an error ([mug.Error]) if the send operation fails.
pub fn set_line_mode(stream: DataStream) -> Nil
Set the socket to line-delimited packet mode ({packet, line}). In this mode, recv returns one complete line per call.
pub fn set_raw_mode(stream: DataStream) -> Nil
Set the socket back to raw packet mode ({packet, raw}). In this mode, recv returns whatever data is available.
pub fn shutdown(stream: DataStream) -> Result(Nil, mug.Error)
Shuts down the provided stream, whether it’s an SSL or TCP stream. It abstracts away the differences between the two types of streams and provides a unified interface for shutting them down.
pub fn upgrade_to_ssl(
stream: DataStream,
options: kafein.WrapOptions,
) -> Result(DataStream, kafein.Error)
Upgrades a TCP stream to an SSL stream using the provided options. If the stream is already an SSL stream, it returns it as is.