Pipeline.Test.AsyncMocks (PipelineEx v0.1.1)

View Source

Mock support for testing async streaming functionality.

This module provides utilities for creating mock async streams with configurable behaviors, timing patterns, and error injection capabilities. It integrates with the existing mock system to enable comprehensive testing of async streaming features.

Features

  • Configurable streaming patterns (fast, slow, chunked)
  • Deterministic timing simulation
  • Error injection at specific points
  • Integration with existing mock providers
  • Support for various message types

Example

# Create a simple mock stream
stream = AsyncMocks.create_stream("test content")

# Create a slow stream with delays
slow_stream = AsyncMocks.create_stream("test", pattern: :slow)

# Create a stream that errors after 3 messages
error_stream = AsyncMocks.create_stream("test",
  error_after: 3,
  error_type: :network_error
)

Summary

Functions

Creates a mock AsyncResponse with streaming behavior.

Creates a predefined mock stream for common test scenarios.

Creates a mock async stream with configurable behavior.

Injects an error into an existing stream at a specific point.

Clears all async mock configurations.

Sets up mock provider responses for async streaming.

Types

error_type()

@type error_type() ::
  :network_error | :timeout | :invalid_message | :stream_interrupted

mock_options()

@type mock_options() :: %{
  pattern: streaming_pattern(),
  delay_ms: non_neg_integer(),
  chunk_size: pos_integer(),
  error_after: pos_integer() | nil,
  error_type: error_type() | nil,
  include_metadata: boolean(),
  total_tokens: non_neg_integer()
}

scenario()

@type scenario() ::
  :simple | :code_generation | :analysis | :error | :timeout | :empty

streaming_pattern()

@type streaming_pattern() :: :fast | :slow | :chunked | :realistic

Functions

create_async_response(content, step_name, opts \\ [])

@spec create_async_response(String.t(), String.t(), keyword()) :: map()

Creates a mock AsyncResponse with streaming behavior.

DEPRECATED: AsyncResponse module removed.

create_scenario_stream(scenario, opts \\ [])

@spec create_scenario_stream(
  scenario(),
  keyword()
) :: Enumerable.t()

Creates a predefined mock stream for common test scenarios.

Scenarios

  • :simple - Basic successful stream
  • :code_generation - Mock code generation response
  • :analysis - Mock analysis response with multiple sections
  • :error - Stream that errors partway through
  • :timeout - Stream that times out
  • :empty - Stream with no content

create_stream(content, opts \\ [])

@spec create_stream(
  String.t(),
  keyword()
) :: Enumerable.t()

Creates a mock async stream with configurable behavior.

Options

  • :pattern - Streaming pattern to use (default: :fast)

    • :fast - No delays between messages
    • :slow - 50ms delay between messages
    • :chunked - Groups messages in chunks with delays
    • :realistic - Variable delays simulating real network behavior
  • :delay_ms - Override default delay for pattern (milliseconds)

  • :chunk_size - Number of messages per chunk for :chunked pattern

  • :error_after - Inject error after N messages

  • :error_type - Type of error to inject

  • :include_metadata - Include additional metadata in messages

  • :total_tokens - Total token count to simulate

inject_error(stream, after_count, error_type)

@spec inject_error(Enumerable.t(), pos_integer(), error_type()) :: Enumerable.t()

Injects an error into an existing stream at a specific point.

This is useful for testing error handling in stream processors.

reset_async_mocks()

@spec reset_async_mocks() :: :ok

Clears all async mock configurations.

setup_async_mock(pattern, opts \\ [])

@spec setup_async_mock(
  String.t() | Regex.t(),
  keyword()
) :: :ok

Sets up mock provider responses for async streaming.

This configures the mock Claude provider to return async responses.