Streaming

View Source

This guide covers Tinkex's streaming capabilities for working with Server-Sent Events (SSE) from event-stream endpoints.

Overview

Tinkex provides low-level streaming support through the SSE (Server-Sent Events) decoder, which can parse incremental chunks of event data from HTTP streams. The primary use case is consuming real-time event streams from compatible endpoints.

The streaming implementation consists of two main components:

Server-Sent Events Format

Server-Sent Events follow a simple text-based format where each event is separated by double newlines (\n\n, \r\n\r\n, or \r\r). Each event consists of one or more fields:

event: custom_event_name
data: {"key": "value"}
id: event-123
retry: 5000

Field types:

  • event - Event type identifier (optional)
  • data - Event payload, can span multiple lines
  • id - Event ID for tracking (optional)
  • retry - Reconnection delay in milliseconds (optional)
  • Lines starting with : are comments and ignored

Using the SSEDecoder

The SSEDecoder module provides a stateful decoder that can be fed incremental binary chunks. This is useful when processing streaming HTTP responses where data arrives in fragments.

Basic usage

alias Tinkex.Streaming.{SSEDecoder, ServerSentEvent}

# Create a new decoder
decoder = SSEDecoder.new()

# Feed binary chunks as they arrive
chunk1 = "data: {\"message\": \"hello\"}\n\n"
{events1, decoder} = SSEDecoder.feed(decoder, chunk1)

# First event is parsed
[%ServerSentEvent{data: data}] = events1
IO.inspect(data)  # "{\"message\": \"hello\"}"

# Continue feeding more chunks
chunk2 = "event: update\ndata: {\"count\": 42}\n\n"
{events2, decoder} = SSEDecoder.feed(decoder, chunk2)

The ServerSentEvent struct

Each parsed event is represented as a ServerSentEvent struct:

%ServerSentEvent{
  event: "custom",           # Event type (or nil for unnamed events)
  data: "{\"value\": 123}",  # Event payload as string
  id: "evt-001",             # Event ID (or nil)
  retry: 5000                # Retry delay in ms (or nil)
}

Decoding JSON data

Use ServerSentEvent.json/1 to attempt JSON decoding of the event data:

event = %ServerSentEvent{data: "{\"result\": 42}"}

decoded = ServerSentEvent.json(event)
# Returns: %{"result" => 42}

# If JSON parsing fails, returns the raw string
event = %ServerSentEvent{data: "plain text"}
ServerSentEvent.json(event)
# Returns: "plain text"

Handling Partial Chunks

The decoder maintains an internal buffer to handle partial events that span multiple chunks:

decoder = SSEDecoder.new()

# First chunk contains incomplete event
{[], decoder} = SSEDecoder.feed(decoder, "data: {\"par")

# Second chunk completes the event
{events, decoder} = SSEDecoder.feed(decoder, "tial\"}\n\n")

[event] = events
event.data  # "{\"partial\"}"

The decoder automatically:

  • Buffers incomplete events
  • Handles various line ending styles (\n, \r\n, \r)
  • Supports double-newline separators in all formats
  • Processes multiple events in a single chunk

Streaming with the API Client

The Tinkex.API module provides stream_get/2 for consuming SSE endpoints directly:

{:ok, stream_response} = Tinkex.API.stream_get("/api/v1/events", config: config)

# Access stream metadata
stream_response.status       # 200
stream_response.method       # :get
stream_response.url          # Full URL
stream_response.headers      # Response headers map

# Process events from the stream
stream_response.stream
|> Enum.each(fn event ->
  IO.inspect(event, label: "Received event")
end)

Custom event parsing

By default, stream_get/2 parses event data as JSON. You can customize this with the :event_parser option:

# Return raw ServerSentEvent structs
{:ok, response} =
  Tinkex.API.stream_get("/events",
    config: config,
    event_parser: :raw
  )

response.stream
|> Enum.each(fn %ServerSentEvent{} = event ->
  IO.puts("Event type: #{event.event}")
  IO.puts("Data: #{event.data}")
end)

# Use custom parser function
parser = fn event ->
  # Custom transformation logic
  event.data
  |> String.upcase()
end

{:ok, response} =
  Tinkex.API.stream_get("/events",
    config: config,
    event_parser: parser
  )

Example: Processing a stream

alias Tinkex.API

# Configure the client
config = Tinkex.Config.new(
  api_key: System.fetch_env!("TINKER_API_KEY"),
  timeout: 30_000  # Longer timeout for streaming
)

# Connect to an event stream
{:ok, stream_resp} = API.stream_get("/api/v1/notifications", config: config)

# Process events as they arrive
stream_resp.stream
|> Stream.filter(fn event ->
  event["type"] == "notification"
end)
|> Stream.map(fn event ->
  %{
    timestamp: DateTime.utc_now(),
    message: event["message"]
  }
end)
|> Enum.take(10)  # Take first 10 events

Error Handling

Streaming operations can fail at multiple points. Handle errors appropriately:

case API.stream_get("/events", config: config) do
  {:ok, stream_resp} ->
    try do
      stream_resp.stream
      |> Enum.each(&process_event/1)
    rescue
      e in RuntimeError ->
        Logger.error("Stream processing failed: #{Exception.message(e)}")
    end

  {:error, %Tinkex.Error{} = error} ->
    Logger.error("Failed to connect to stream: #{error.message}")
    # Check error.type for specific error categories:
    # :api_connection, :api_status, :validation
end

Connection errors

Common errors when establishing streams:

  • :api_connection - Network/transport errors, failed DNS, timeouts
  • :api_status - HTTP error status codes (4xx, 5xx)
  • :validation - Invalid response format

Processing errors

Errors during stream consumption typically surface as exceptions when enumerating:

{:ok, stream_resp} = API.stream_get("/events", config: config)

# Wrap enumeration in error handling
result =
  try do
    count =
      stream_resp.stream
      |> Enum.count()

    {:ok, count}
  rescue
    e -> {:error, e}
  end

case result do
  {:ok, count} -> IO.puts("Processed #{count} events")
  {:error, e} -> Logger.error("Stream error: #{inspect(e)}")
end

Use Cases

Real-time notifications

# Monitor a notification stream
def monitor_notifications(config) do
  {:ok, stream} = API.stream_get("/notifications", config: config)

  stream.stream
  |> Stream.each(fn notification ->
    send_alert(notification["severity"], notification["message"])
  end)
  |> Stream.run()
end

Event aggregation

# Collect events over a time window
def collect_metrics(config, duration_ms) do
  {:ok, stream} = API.stream_get("/metrics", config: config)

  task = Task.async(fn ->
    stream.stream
    |> Enum.take_while(fn _ ->
      # Could implement time-based cutoff here
      true
    end)
    |> Enum.to_list()
  end)

  Task.await(task, duration_ms)
end

Progressive data loading

# Load large datasets progressively
def stream_dataset(config, dataset_id) do
  path = "/datasets/#{dataset_id}/stream"
  {:ok, stream} = API.stream_get(path, config: config)

  stream.stream
  |> Stream.chunk_every(100)  # Process in batches
  |> Stream.each(&process_batch/1)
  |> Stream.run()
end

Current Limitations

The streaming implementation in Tinkex is intentionally minimal and focused on SSE parsing:

  1. No built-in reconnection - Reconnection logic must be implemented by the caller
  2. No automatic retry - Unlike regular API calls, streaming endpoints don't auto-retry
  3. Buffered delivery - Currently stream_get/2 buffers the full response before parsing
  4. Limited endpoint support - Check API documentation to confirm which endpoints support streaming

For production streaming applications requiring reconnection, heartbeat monitoring, or true incremental processing, consider wrapping the SSE decoder in a GenServer or supervision tree that implements these features.

  • API overview: docs/guides/api_reference.md
  • Error handling and categories: docs/guides/troubleshooting.md
  • Configuration and timeouts: docs/guides/getting_started.md