gftp/stream

The stream module provides an interface for working with a network stream, which can either be a TCP stream or a TLS stream. It abstracts away the differences between the two types of streams and provides a unified interface for reading and writing data.

Message-based mode

For integration with OTP actors and other message-based architectures, the stream module provides a non-blocking alternative to synchronous receive. Instead of blocking the calling process, you can use receive_next_packet_as_message to arm the socket for a single message delivery, then use select_stream_messages to build a process.Selector that maps incoming socket messages to your actor’s message type.

This is the standard BEAM approach: set {active, once} on the socket, receive one message, then re-arm. This gives backpressure control and integrates naturally with gleam_erlang selectors and OTP actors.

Types

Data Stream used for communications. It can be both of type Tcp in case of plain communication or Ssl in case of FTPS

pub type DataStream {
  Ssl(ssl: kafein.SslSocket, tcp: mug.Socket)
  Tcp(socket: mug.Socket)
}

Constructors

  • Ssl(ssl: kafein.SslSocket, tcp: mug.Socket)

    Contains the SSL socket and the underlying TCP socket. The TCP socket is necessary in case we want to switch back to a plain TCP stream.

  • Tcp(socket: mug.Socket)

A unified message type for data received asynchronously from a DataStream.

Used with receive_next_packet_as_message and select_stream_messages for non-blocking, message-based I/O that integrates with OTP actors.

pub type StreamMessage {
  Packet(BitArray)
  StreamClosed
  StreamError(result.FtpError)
}

Constructors

  • Packet(BitArray)

    Data received from the stream.

  • StreamClosed

    The remote peer closed the connection.

  • StreamError(result.FtpError)

    An error occurred on the stream.

Values

pub fn controlling_process(
  stream: DataStream,
  pid: process.Pid,
) -> Result(Nil, mug.Error)

Transfer socket ownership to a different process.

This is necessary when a data stream is created in one process (e.g. an OTP actor) but message-based I/O (receive_next_packet_as_message) will be used from another. The receiving process must be the socket’s controlling process to get {active, once} messages.

pub fn downgrade_to_tcp(stream: DataStream) -> DataStream

Downgrades an SSL stream to a TCP stream. If the stream is already a TCP stream, it returns it as is.

pub fn local_address(
  stream: DataStream,
) -> Result(#(String, Int), mug.Error)

Retrieves the local address (IP and port) of the underlying TCP socket.

pub fn peer_address(
  stream: DataStream,
) -> Result(#(String, Int), mug.Error)

Retrieves the peer address (IP and port) of the underlying TCP socket.

pub fn receive(
  stream: DataStream,
  timeout: Int,
) -> Result(BitArray, mug.Error)

Receive a message from the peer.

Errors if the socket is closed, if the timeout is reached, or if any other error occurs during the receive operation. The error is returned as a [mug.Error]. In case of success, it returns the received data as a BitArray.

pub fn receive_exact(
  stream: DataStream,
  size: Int,
  timeout: Int,
) -> Result(BitArray, mug.Error)

Receive an exact number of bytes from the peer.

Errors if the socket is closed, if the timeout is reached, or if any other error occurs during the receive operation. The error is returned as a [mug.Error]. In case of success, it returns the received data as a BitArray.

pub fn receive_next_packet_as_message(stream: DataStream) -> Nil

Set {active, once} on the underlying socket so that the next packet is delivered as an Erlang message to the calling process instead of being buffered for a synchronous receive call.

After each message arrives you must call this again to receive the next one. Use select_stream_messages to build a selector that maps the raw socket messages into StreamMessage values.

pub fn select_stream_messages(
  selector: process.Selector(t),
  mapper: fn(StreamMessage) -> t,
) -> process.Selector(t)

Add handlers for both TCP and SSL socket messages to a process.Selector.

The mapper function converts a StreamMessage into your actor’s message type. Handlers for both TCP and SSL messages are registered; only the one matching the actual socket type will ever fire.

import gleam/erlang/process
import gftp/stream.{type StreamMessage}

type MyMessage {
  GotStream(StreamMessage)
  // ... other variants
}

let selector =
  process.new_selector()
  |> stream.select_stream_messages(GotStream)
pub fn send(
  stream: DataStream,
  data: BitArray,
) -> Result(Nil, mug.Error)

Sends data over the provided stream. It handles both SSL and TCP streams, abstracting away the differences between them.

Returns a Result indicating success (Nil) or an error ([mug.Error]) if the send operation fails.

pub fn set_line_mode(stream: DataStream) -> Nil

Set the socket to line-delimited packet mode ({packet, line}). In this mode, recv returns one complete line per call.

pub fn set_raw_mode(stream: DataStream) -> Nil

Set the socket back to raw packet mode ({packet, raw}). In this mode, recv returns whatever data is available.

pub fn shutdown(stream: DataStream) -> Result(Nil, mug.Error)

Shuts down the provided stream, whether it’s an SSL or TCP stream. It abstracts away the differences between the two types of streams and provides a unified interface for shutting them down.

pub fn upgrade_to_ssl(
  stream: DataStream,
  options: kafein.WrapOptions,
) -> Result(DataStream, kafein.Error)

Upgrades a TCP stream to an SSL stream using the provided options. If the stream is already an SSL stream, it returns it as is.

Search Document