TermUI.Widgets.StreamWidget (TermUI v0.2.0)

View Source

StreamWidget for displaying backpressure-aware streaming data.

StreamWidget can integrate with GenStage for demand-based data streaming, providing controls for stream management and real-time statistics.

Usage

StreamWidget.new(
  buffer_size: 1000,
  overflow_strategy: :drop_oldest
)

Features

  • Backpressure-aware data streaming via GenStage integration
  • Demand-based flow control
  • Buffer management with configurable overflow strategies
  • Pause/resume stream controls
  • Rate limiting for rendering
  • Real-time stream statistics (items/sec)

Keyboard Controls

  • Space: Toggle pause/resume
  • c: Clear buffer
  • s: Toggle stats display
  • Up/Down: Scroll through buffer
  • PageUp/PageDown: Scroll by page
  • Home/End: Jump to first/last item

GenStage Integration

The widget provides a companion consumer module that can be started separately and sends items to the widget:

{:ok, consumer} = StreamWidget.Consumer.start_link(widget_pid)
GenStage.sync_subscribe(consumer, to: producer)

Summary

Functions

Add a single item directly.

Add items directly (for non-GenStage sources).

Get current buffer count.

Clear the buffer.

Get buffer items as a list.

Get current statistics.

Creates new StreamWidget props.

Pause receiving items.

Check if stream is paused.

Resume receiving items.

Set buffer size. Will drop oldest items if new size is smaller.

Get stream state.

Types

overflow_strategy()

@type overflow_strategy() :: :drop_oldest | :drop_newest | :block | :sliding

stats()

@type stats() :: %{
  items_received: non_neg_integer(),
  items_dropped: non_neg_integer(),
  items_per_second: float(),
  buffer_size: non_neg_integer(),
  buffer_capacity: non_neg_integer(),
  last_update: DateTime.t() | nil
}

stream_item()

@type stream_item() :: %{
  id: non_neg_integer(),
  timestamp: DateTime.t(),
  data: any(),
  metadata: map()
}

stream_state()

@type stream_state() :: :idle | :running | :paused | :error

Functions

add_item(state, item)

@spec add_item(map(), any()) :: {:ok, map()}

Add a single item directly.

add_items(state, items)

@spec add_items(map(), [any()]) :: {:ok, map()}

Add items directly (for non-GenStage sources).

buffer_count(state)

@spec buffer_count(map()) :: non_neg_integer()

Get current buffer count.

clear(state)

@spec clear(map()) :: {:ok, map()}

Clear the buffer.

get_items(state)

@spec get_items(map()) :: [stream_item()]

Get buffer items as a list.

get_stats(state)

@spec get_stats(map()) :: stats()

Get current statistics.

new(opts)

@spec new(keyword()) :: map()

Creates new StreamWidget props.

Options

  • :buffer_size - Maximum items in buffer (default: 1000)
  • :overflow_strategy - What to do when buffer is full (default: :drop_oldest)
  • :demand - How many items to request at a time (default: 10)
  • :show_stats - Display statistics bar (default: true)
  • :render_rate_ms - Minimum time between renders (default: 100)
  • :item_renderer - Function to render each item (fn item -> String.t)
  • :on_item - Callback when item is received
  • :on_error - Callback when error occurs

pause(state)

@spec pause(map()) :: {:ok, map()}

Pause receiving items.

paused?(state)

@spec paused?(map()) :: boolean()

Check if stream is paused.

resume(state)

@spec resume(map()) :: {:ok, map()}

Resume receiving items.

set_buffer_size(state, new_size)

@spec set_buffer_size(map(), non_neg_integer()) :: {:ok, map()}

Set buffer size. Will drop oldest items if new size is smaller.

set_overflow_strategy(state, strategy)

@spec set_overflow_strategy(map(), overflow_strategy()) :: {:ok, map()}

Set overflow strategy.

stream_state(state)

@spec stream_state(map()) :: stream_state()

Get stream state.