Raxol.Terminal.Graphics.StreamingData (Raxol v2.0.1)
View SourceReal-time data streaming system for terminal graphics visualizations.
This module provides high-performance streaming data infrastructure for:
- Real-time data ingestion from multiple sources
- Buffering and windowing strategies for streaming data
- Data transformation and preprocessing pipelines
- Automatic scaling and sampling for visualization
- Memory-efficient data management
- WebSocket and TCP stream integration
Features
Data Sources
- WebSocket connections for real-time data
- TCP/UDP stream processing
- File system monitoring and streaming
- Database change streams
- Message queue integration (RabbitMQ, Kafka)
- HTTP polling with configurable intervals
Data Processing
- Configurable buffering strategies (time-based, count-based)
- Data windowing (sliding, tumbling, session windows)
- Real-time aggregation (sum, avg, min, max, percentiles)
- Data filtering and transformation pipelines
- Automatic outlier detection and handling
- Data quality monitoring
Performance Optimization
- Memory-efficient circular buffers
- Configurable sampling and downsampling
- Backpressure handling for high-volume streams
- Batch processing for improved throughput
- Automatic memory management and cleanup
Usage
# Create a streaming data source
{:ok, stream_id} = StreamingData.create_stream(%{
source_type: :websocket,
endpoint: "ws://localhost:8080/metrics",
buffer_size: 1000,
window_type: :sliding,
window_size: 60_000, # 1 minute
sampling_rate: 0.1 # Keep 10% of data points
})
# Connect to visualization
StreamingData.connect_to_visualization(stream_id, chart_id)
# Start streaming
StreamingData.start_stream(stream_id)
Summary
Functions
Returns a specification to start this module under a supervisor.
Connects a stream to a data visualization chart.
Creates a new streaming data source.
Gets current streaming statistics.
Gets aggregated data from completed windows.
Callback implementation for Raxol.Core.Behaviours.BaseManager.handle_manager_cast/2.
Starts data streaming from the configured source.
Stops data streaming and closes connections.
Updates stream configuration dynamically.
Types
@type data_source() ::
:websocket | :tcp | :udp | :file | :database | :http_poll | :message_queue
@type data_window() :: %{ id: String.t(), start_time: non_neg_integer(), end_time: non_neg_integer(), data_points: [map()], aggregations: map(), metadata: map() }
@type sampling_strategy() :: :uniform | :reservoir | :time_based | :adaptive
@type stream_config() :: %{ source_type: data_source(), endpoint: String.t(), buffer_size: non_neg_integer(), window_type: window_type(), window_size: non_neg_integer(), sampling_rate: float(), sampling_strategy: sampling_strategy(), aggregation: [atom()], filters: [function()], backpressure_strategy: :drop | :buffer | :throttle }
@type stream_id() :: String.t()
@type stream_state() :: %{ id: stream_id(), config: stream_config(), connection: term(), buffer: :queue.queue(), current_window: data_window(), completed_windows: [data_window()], connected_visualizations: [String.t()], statistics: map(), last_activity: non_neg_integer() }
@type window_type() :: :sliding | :tumbling | :session
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
Connects a stream to a data visualization chart.
Examples
# Connect stream to real-time line chart
:ok = StreamingData.connect_to_visualization(stream_id, chart_id)
# Multiple visualizations can connect to the same stream
:ok = StreamingData.connect_to_visualization(stream_id, histogram_id)
@spec create_stream(stream_config()) :: {:ok, stream_id()} | {:error, term()}
Creates a new streaming data source.
Parameters
config- Stream configuration including source type, endpoints, buffering
Returns
{:ok, stream_id}- Successfully created stream{:error, reason}- Failed to create stream
Examples
# WebSocket stream
{:ok, stream_id} = StreamingData.create_stream(%{
source_type: :websocket,
endpoint: "ws://localhost:8080/data",
buffer_size: 500,
window_type: :sliding,
window_size: 30_000, # 30 seconds
sampling_rate: 0.2, # Keep 20% of data
aggregation: [:avg, :max, :min],
filters: [&filter_valid_data/1, &normalize_timestamps/1]
})
# HTTP polling stream
{:ok, poll_stream} = StreamingData.create_stream(%{
source_type: :http_poll,
endpoint: "http://api.example.com/metrics",
poll_interval: 5000, # Poll every 5 seconds
buffer_size: 200,
window_type: :tumbling,
window_size: 60_000 # 1 minute windows
})
Gets current streaming statistics.
Returns
- Map containing stream statistics (throughput, buffer usage, window counts, etc.)
Examples
stats = StreamingData.get_stream_stats(stream_id)
# => %{
# throughput: 150.5, # data points per second
# buffer_usage: 0.75, # 75% buffer utilization
# total_windows: 120, # completed windows
# active_connections: 3, # connected visualizations
# data_quality: 0.98 # 98% valid data points
# }
@spec get_windowed_data(stream_id(), map()) :: {:ok, [data_window()]} | {:error, term()}
Gets aggregated data from completed windows.
Parameters
stream_id- Stream identifieroptions- Query options (time_range, limit, aggregation_types)
Examples
# Get last 10 minutes of aggregated data
end_time = System.system_time(:millisecond)
start_time = end_time - 600_000 # 10 minutes ago
{:ok, windows} = StreamingData.get_windowed_data(stream_id, %{
time_range: {start_time, end_time},
aggregations: [:avg, :max],
limit: 20
})
Callback implementation for Raxol.Core.Behaviours.BaseManager.handle_manager_cast/2.
Starts data streaming from the configured source.
Examples
:ok = StreamingData.start_stream(stream_id)
Stops data streaming and closes connections.
Examples
:ok = StreamingData.stop_stream(stream_id)
Updates stream configuration dynamically.
Examples
# Increase buffer size during high load
:ok = StreamingData.update_stream_config(stream_id, %{
buffer_size: 2000,
sampling_rate: 0.5 # Increase sampling to handle load
})