TermUI.Widgets.StreamWidget (TermUI v0.2.0)
View SourceStreamWidget for displaying backpressure-aware streaming data.
StreamWidget can integrate with GenStage for demand-based data streaming, providing controls for stream management and real-time statistics.
Usage
StreamWidget.new(
buffer_size: 1000,
overflow_strategy: :drop_oldest
)Features
- Backpressure-aware data streaming via GenStage integration
- Demand-based flow control
- Buffer management with configurable overflow strategies
- Pause/resume stream controls
- Rate limiting for rendering
- Real-time stream statistics (items/sec)
Keyboard Controls
- Space: Toggle pause/resume
- c: Clear buffer
- s: Toggle stats display
- Up/Down: Scroll through buffer
- PageUp/PageDown: Scroll by page
- Home/End: Jump to first/last item
GenStage Integration
The widget provides a companion consumer module that can be started separately and sends items to the widget:
{:ok, consumer} = StreamWidget.Consumer.start_link(widget_pid)
GenStage.sync_subscribe(consumer, to: producer)
Summary
Functions
Add a single item directly.
Add items directly (for non-GenStage sources).
Get current buffer count.
Clear the buffer.
Get buffer items as a list.
Get current statistics.
Creates new StreamWidget props.
Pause receiving items.
Check if stream is paused.
Resume receiving items.
Set buffer size. Will drop oldest items if new size is smaller.
Set overflow strategy.
Get stream state.
Types
@type overflow_strategy() :: :drop_oldest | :drop_newest | :block | :sliding
@type stats() :: %{ items_received: non_neg_integer(), items_dropped: non_neg_integer(), items_per_second: float(), buffer_size: non_neg_integer(), buffer_capacity: non_neg_integer(), last_update: DateTime.t() | nil }
@type stream_item() :: %{ id: non_neg_integer(), timestamp: DateTime.t(), data: any(), metadata: map() }
@type stream_state() :: :idle | :running | :paused | :error
Functions
Add a single item directly.
Add items directly (for non-GenStage sources).
@spec buffer_count(map()) :: non_neg_integer()
Get current buffer count.
Clear the buffer.
@spec get_items(map()) :: [stream_item()]
Get buffer items as a list.
Get current statistics.
Creates new StreamWidget props.
Options
:buffer_size- Maximum items in buffer (default: 1000):overflow_strategy- What to do when buffer is full (default: :drop_oldest):demand- How many items to request at a time (default: 10):show_stats- Display statistics bar (default: true):render_rate_ms- Minimum time between renders (default: 100):item_renderer- Function to render each item (fn item -> String.t):on_item- Callback when item is received:on_error- Callback when error occurs
Pause receiving items.
Check if stream is paused.
Resume receiving items.
@spec set_buffer_size(map(), non_neg_integer()) :: {:ok, map()}
Set buffer size. Will drop oldest items if new size is smaller.
@spec set_overflow_strategy(map(), overflow_strategy()) :: {:ok, map()}
Set overflow strategy.
@spec stream_state(map()) :: stream_state()
Get stream state.