Raxol.Terminal.Graphics.StreamingData (Raxol v2.0.1)

View Source

Real-time data streaming system for terminal graphics visualizations.

This module provides high-performance streaming data infrastructure for:

  • Real-time data ingestion from multiple sources
  • Buffering and windowing strategies for streaming data
  • Data transformation and preprocessing pipelines
  • Automatic scaling and sampling for visualization
  • Memory-efficient data management
  • WebSocket and TCP stream integration

Features

Data Sources

  • WebSocket connections for real-time data
  • TCP/UDP stream processing
  • File system monitoring and streaming
  • Database change streams
  • Message queue integration (RabbitMQ, Kafka)
  • HTTP polling with configurable intervals

Data Processing

  • Configurable buffering strategies (time-based, count-based)
  • Data windowing (sliding, tumbling, session windows)
  • Real-time aggregation (sum, avg, min, max, percentiles)
  • Data filtering and transformation pipelines
  • Automatic outlier detection and handling
  • Data quality monitoring

Performance Optimization

  • Memory-efficient circular buffers
  • Configurable sampling and downsampling
  • Backpressure handling for high-volume streams
  • Batch processing for improved throughput
  • Automatic memory management and cleanup

Usage

# Create a streaming data source
{:ok, stream_id} = StreamingData.create_stream(%{
  source_type: :websocket,
  endpoint: "ws://localhost:8080/metrics",
  buffer_size: 1000,
  window_type: :sliding,
  window_size: 60_000,  # 1 minute
  sampling_rate: 0.1    # Keep 10% of data points
})

# Connect to visualization
StreamingData.connect_to_visualization(stream_id, chart_id)

# Start streaming
StreamingData.start_stream(stream_id)

Summary

Functions

Returns a specification to start this module under a supervisor.

Connects a stream to a data visualization chart.

Creates a new streaming data source.

Gets current streaming statistics.

Gets aggregated data from completed windows.

Starts data streaming from the configured source.

Stops data streaming and closes connections.

Updates stream configuration dynamically.

Types

data_source()

@type data_source() ::
  :websocket | :tcp | :udp | :file | :database | :http_poll | :message_queue

data_window()

@type data_window() :: %{
  id: String.t(),
  start_time: non_neg_integer(),
  end_time: non_neg_integer(),
  data_points: [map()],
  aggregations: map(),
  metadata: map()
}

sampling_strategy()

@type sampling_strategy() :: :uniform | :reservoir | :time_based | :adaptive

stream_config()

@type stream_config() :: %{
  source_type: data_source(),
  endpoint: String.t(),
  buffer_size: non_neg_integer(),
  window_type: window_type(),
  window_size: non_neg_integer(),
  sampling_rate: float(),
  sampling_strategy: sampling_strategy(),
  aggregation: [atom()],
  filters: [function()],
  backpressure_strategy: :drop | :buffer | :throttle
}

stream_id()

@type stream_id() :: String.t()

stream_state()

@type stream_state() :: %{
  id: stream_id(),
  config: stream_config(),
  connection: term(),
  buffer: :queue.queue(),
  current_window: data_window(),
  completed_windows: [data_window()],
  connected_visualizations: [String.t()],
  statistics: map(),
  last_activity: non_neg_integer()
}

window_type()

@type window_type() :: :sliding | :tumbling | :session

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

connect_to_visualization(stream_id, visualization_id)

@spec connect_to_visualization(stream_id(), String.t()) :: :ok | {:error, term()}

Connects a stream to a data visualization chart.

Examples

# Connect stream to real-time line chart
:ok = StreamingData.connect_to_visualization(stream_id, chart_id)

# Multiple visualizations can connect to the same stream
:ok = StreamingData.connect_to_visualization(stream_id, histogram_id)

create_stream(config)

@spec create_stream(stream_config()) :: {:ok, stream_id()} | {:error, term()}

Creates a new streaming data source.

Parameters

  • config - Stream configuration including source type, endpoints, buffering

Returns

  • {:ok, stream_id} - Successfully created stream
  • {:error, reason} - Failed to create stream

Examples

# WebSocket stream
{:ok, stream_id} = StreamingData.create_stream(%{
  source_type: :websocket,
  endpoint: "ws://localhost:8080/data",
  buffer_size: 500,
  window_type: :sliding,
  window_size: 30_000,  # 30 seconds
  sampling_rate: 0.2,   # Keep 20% of data
  aggregation: [:avg, :max, :min],
  filters: [&filter_valid_data/1, &normalize_timestamps/1]
})

# HTTP polling stream
{:ok, poll_stream} = StreamingData.create_stream(%{
  source_type: :http_poll,
  endpoint: "http://api.example.com/metrics",
  poll_interval: 5000,  # Poll every 5 seconds
  buffer_size: 200,
  window_type: :tumbling,
  window_size: 60_000   # 1 minute windows
})

get_stream_stats(stream_id)

@spec get_stream_stats(stream_id()) :: map()

Gets current streaming statistics.

Returns

  • Map containing stream statistics (throughput, buffer usage, window counts, etc.)

Examples

stats = StreamingData.get_stream_stats(stream_id)
# => %{
#   throughput: 150.5,        # data points per second
#   buffer_usage: 0.75,       # 75% buffer utilization
#   total_windows: 120,       # completed windows
#   active_connections: 3,    # connected visualizations
#   data_quality: 0.98        # 98% valid data points
# }

get_windowed_data(stream_id, options \\ %{})

@spec get_windowed_data(stream_id(), map()) ::
  {:ok, [data_window()]} | {:error, term()}

Gets aggregated data from completed windows.

Parameters

  • stream_id - Stream identifier
  • options - Query options (time_range, limit, aggregation_types)

Examples

# Get last 10 minutes of aggregated data
end_time = System.system_time(:millisecond)
start_time = end_time - 600_000  # 10 minutes ago

{:ok, windows} = StreamingData.get_windowed_data(stream_id, %{
  time_range: {start_time, end_time},
  aggregations: [:avg, :max],
  limit: 20
})

handle_manager_cast(msg, state)

Callback implementation for Raxol.Core.Behaviours.BaseManager.handle_manager_cast/2.

start_link(init_opts \\ [])

start_stream(stream_id)

@spec start_stream(stream_id()) :: :ok | {:error, term()}

Starts data streaming from the configured source.

Examples

:ok = StreamingData.start_stream(stream_id)

stop_stream(stream_id)

@spec stop_stream(stream_id()) :: :ok | {:error, term()}

Stops data streaming and closes connections.

Examples

:ok = StreamingData.stop_stream(stream_id)

update_stream_config(stream_id, config_updates)

@spec update_stream_config(stream_id(), map()) :: :ok | {:error, term()}

Updates stream configuration dynamically.

Examples

# Increase buffer size during high load
:ok = StreamingData.update_stream_config(stream_id, %{
  buffer_size: 2000,
  sampling_rate: 0.5  # Increase sampling to handle load
})