v0.8.0 - Process Safety

View Source

Overview

This release completes the Robustness Phase by adding process safety mechanisms. The focus is on preventing zombie processes, handling crashes gracefully, and adding timeouts to prevent hangs.

Phase: Robustness Duration: 2 weeks Prerequisites: v0.7.0 (bugs fixed, APIs updated)

Objectives

  1. Replace spawn with spawn_link for crash propagation
  2. Add monitors for critical process relationships
  3. Add timeouts to all receive loops
  4. Implement crash recovery strategies
  5. Document process supervision architecture

Process Safety Improvements

Analysis Reference: Section 8.3 of DXNN2_CODEBASE_ANALYSIS.md

Affected Modules:

  • exoself.erl - spawns sensors, neurons, actuators, cortex
  • population_monitor.erl - spawns exoself processes

Problem: If a child process crashes, parent continues with invalid state

Solution:

%% Before: No crash notification
Pid = spawn(Module, Function, Args)

%% After: Crash propagates to parent
Pid = spawn_link(Module, Function, Args)

%% Or with monitoring for more control
{Pid, MonRef} = spawn_monitor(Module, Function, Args)

exoself.erl Changes:

spawn_sensors(State) ->
    Sensors = genotype:read_sensors(State#exoself_state.agent_id),
    lists:map(
        fun(Sensor) ->
            %% Use spawn_link for crash propagation
            Pid = spawn_link(sensor, gen, [self(), Sensor]),
            ets:insert(State#exoself_state.id_to_process_map,
                      {Sensor#sensor.id, Pid}),
            {Pid, Sensor#sensor.id}
        end,
        Sensors
    ).

2. Add Process Monitors for Supervision

%% In exoself.erl - monitor critical processes
spawn_and_monitor_cortex(State) ->
    Cortex = genotype:read_cortex(State#exoself_state.agent_id),
    Pid = spawn_link(cortex, gen, [self(), Cortex]),
    MonRef = monitor(process, Pid),
    {Pid, MonRef}.

%% Handle monitor messages
handle_info({'DOWN', MonRef, process, Pid, Reason}, State) ->
    handle_process_crash(Pid, Reason, MonRef, State).

handle_process_crash(Pid, Reason, _MonRef, State) ->
    case Pid == State#exoself_state.cortex_pid of
        true ->
            %% Cortex crashed - terminate evaluation
            tweann_logger:error("Cortex crashed: ~p", [Reason]),
            terminate_all_processes(State),
            {stop, {cortex_crash, Reason}, State};
        false ->
            %% Neuron or sensor crashed - terminate evaluation
            tweann_logger:warning("Network process crashed: ~p", [Reason]),
            terminate_all_processes(State),
            {stop, {network_crash, Reason}, State}
    end.

3. Add Timeouts to Receive Loops

Analysis Reference: Section 5.4 of DXNN2_CODEBASE_ANALYSIS.md

Problem: Receive without timeout can hang indefinitely

cortex.erl Changes:

%% Before: No timeout
loop(State) ->
    receive
        {ActuatorPid, sync, Fitness, EndFlag} ->
            handle_sync(...)
    end.

%% After: With timeout
-define(SYNC_TIMEOUT, 30000).  % 30 seconds

loop(State) ->
    receive
        {ActuatorPid, sync, Fitness, EndFlag} ->
            handle_sync(...)
    after ?SYNC_TIMEOUT ->
        handle_sync_timeout(State)
    end.

handle_sync_timeout(State) ->
    tweann_logger:warning("Cortex sync timeout, agents: ~p",
                         [State#cortex_state.actuator_pids]),
    ExoselfPid = State#cortex_state.exoself_pid,
    ExoselfPid ! {self(), evaluation_timeout},
    {stop, timeout}.

neuron.erl Changes:

-define(INPUT_TIMEOUT, 10000).  % 10 seconds per input batch

loop(State) ->
    receive
        {SourcePid, forward, Signal} ->
            handle_input(SourcePid, Signal, State)
    after ?INPUT_TIMEOUT ->
        handle_input_timeout(State)
    end.

handle_input_timeout(State) ->
    tweann_logger:warning("Neuron ~p input timeout, waiting for: ~p",
                         [State#neuron_state.id, State#neuron_state.expected_inputs]),
    CortexPid = State#neuron_state.cortex_process_id,
    CortexPid ! {self(), neuron_timeout},
    loop(State).  % Continue waiting or terminate

4. Implement Termination Propagation

%% In exoself.erl
terminate_all_processes(State) ->
    %% Send terminate to all network processes
    lists:foreach(
        fun(Pid) -> Pid ! terminate end,
        get_all_network_pids(State)
    ),

    %% Wait for processes to terminate
    wait_for_termination(State#exoself_state.cortex_pid, 1000).

get_all_network_pids(State) ->
    [State#exoself_state.cortex_pid] ++
    State#exoself_state.sensor_process_ids ++
    State#exoself_state.neuron_process_ids ++
    State#exoself_state.actuator_process_ids.

wait_for_termination(Pid, Timeout) ->
    receive
        {'EXIT', Pid, _Reason} -> ok
    after Timeout ->
        %% Force kill if not responding
        exit(Pid, kill),
        ok
    end.

5. Add Crash Recovery Strategies

%% In population_monitor.erl
handle_agent_crash(AgentId, Reason, State) ->
    tweann_logger:warning("Agent ~p crashed: ~p", [AgentId, Reason]),

    %% Record failure
    UpdatedState = record_agent_failure(AgentId, Reason, State),

    %% Recovery strategy based on crash type
    case categorize_crash(Reason) of
        transient ->
            %% Retry evaluation
            retry_agent_evaluation(AgentId, UpdatedState);
        permanent ->
            %% Remove agent and continue
            mark_agent_failed(AgentId, UpdatedState);
        critical ->
            %% Terminate population evaluation
            {stop, {critical_failure, AgentId, Reason}, UpdatedState}
    end.

categorize_crash(Reason) ->
    case Reason of
        timeout -> transient;
        {database_error, _} -> transient;
        {invalid_genotype, _} -> permanent;
        _ -> permanent
    end.

Tests to Write

process_safety_test.erl

-module(process_safety_test).
-include_lib("eunit/include/eunit.hrl").

%% ============================================================================
%% spawn_link tests
%% ============================================================================

child_crash_propagates_test() ->
    %% Parent should receive EXIT when child crashes
    process_flag(trap_exit, true),

    ChildPid = spawn_link(fun() ->
        timer:sleep(100),
        exit(deliberate_crash)
    end),

    receive
        {'EXIT', ChildPid, deliberate_crash} ->
            ok
    after 1000 ->
        ?assert(false)
    end.

%% ============================================================================
%% Timeout tests
%% ============================================================================

cortex_sync_timeout_test() ->
    %% Cortex should timeout if actuator doesn't respond
    CortexPid = test_helpers:spawn_test_cortex_with_timeout(100),

    %% Don't send actuator sync
    receive
        {CortexPid, evaluation_timeout} ->
            ok
    after 500 ->
        ?assert(false)
    end.

neuron_input_timeout_test() ->
    %% Neuron should handle missing inputs gracefully
    NeuronPid = test_helpers:spawn_test_neuron_with_timeout(100),

    %% Don't send all expected inputs
    receive
        {NeuronPid, neuron_timeout} ->
            ok
    after 500 ->
        ?assert(false)
    end.

%% ============================================================================
%% Monitor tests
%% ============================================================================

monitor_receives_down_test() ->
    Pid = spawn(fun() ->
        timer:sleep(100),
        exit(normal)
    end),
    MonRef = monitor(process, Pid),

    receive
        {'DOWN', MonRef, process, Pid, normal} ->
            ok
    after 1000 ->
        ?assert(false)
    end.

exoself_handles_cortex_crash_test() ->
    %% Start exoself with monitored cortex
    {ok, ExoselfPid} = test_helpers:start_test_exoself(),

    %% Crash the cortex
    CortexPid = test_helpers:get_cortex_pid(ExoselfPid),
    exit(CortexPid, kill),

    %% Exoself should detect and handle
    receive
        {ExoselfPid, terminated, {cortex_crash, _}} ->
            ok
    after 1000 ->
        ?assert(false)
    end.

%% ============================================================================
%% Termination propagation tests
%% ============================================================================

terminate_propagates_to_network_test() ->
    %% Create network
    {ExoselfPid, NetworkPids} = test_helpers:create_test_network(),

    %% Terminate exoself
    ExoselfPid ! terminate,
    timer:sleep(100),

    %% All network processes should be terminated
    lists:foreach(
        fun(Pid) ->
            ?assertNot(is_process_alive(Pid))
        end,
        NetworkPids
    ).

%% ============================================================================
%% Crash recovery tests
%% ============================================================================

agent_crash_retry_test() ->
    %% Start population monitor
    {ok, PopMon} = test_helpers:start_test_population_monitor(),

    %% Cause transient agent crash (timeout)
    AgentId = test_helpers:get_first_agent_id(PopMon),
    test_helpers:cause_agent_timeout(AgentId),

    %% Should retry
    receive
        {agent_retry, AgentId} ->
            ok
    after 5000 ->
        ?assert(false)
    end.

agent_permanent_failure_test() ->
    %% Create agent with invalid genotype
    {ok, PopMon} = test_helpers:start_test_population_monitor(),
    {ok, BadAgentId} = test_helpers:create_invalid_agent(),

    %% Should mark as failed and continue
    gen_server:cast(PopMon, {start_agent, BadAgentId}),

    receive
        {agent_failed, BadAgentId} ->
            ok
    after 5000 ->
        ?assert(false)
    end.

timeout_configuration_test.erl

-module(timeout_configuration_test).
-include_lib("eunit/include/eunit.hrl").

%% ============================================================================
%% Timeout configuration tests
%% ============================================================================

cortex_timeout_configurable_test() ->
    %% Should be able to configure timeout
    Timeout = 5000,
    Config = #{sync_timeout => Timeout},
    State = cortex:init_state(Config),
    ?assertEqual(Timeout, State#cortex_state.sync_timeout).

neuron_timeout_default_test() ->
    %% Should have reasonable default
    State = neuron:init_state(#{}),
    ?assert(State#neuron_state.input_timeout > 0).

%% ============================================================================
%% Timeout behavior tests
%% ============================================================================

short_timeout_triggers_early_test() ->
    %% Short timeout should trigger quickly
    ShortTimeout = 50,
    Start = time_utils:timestamp(),

    receive
    after ShortTimeout ->
        Elapsed = time_utils:elapsed_ms(Start),
        ?assert(Elapsed < 100)
    end.

Documentation Requirements

Required Documentation

  1. Process architecture

    • Process relationships
    • Supervision strategy
    • Crash propagation
  2. Timeout configuration

    • Default values
    • How to configure
    • Timeout handling
  3. Recovery strategies

    • Transient vs permanent failures
    • Retry logic
    • Graceful degradation

Documentation Checklist

  • [ ] Process supervision diagram
  • [ ] Timeout values and rationale
  • [ ] Crash recovery documented
  • [ ] All safety patterns documented

Quality Gates

v0.8.0 Acceptance Criteria

  1. Process Linking

    • [ ] All spawns use spawn_link or spawn_monitor
    • [ ] EXIT messages handled properly
    • [ ] No zombie processes possible
  2. Timeouts

    • [ ] All receive loops have timeouts
    • [ ] Timeout values are configurable
    • [ ] Timeout handling is documented
  3. Monitoring

    • [ ] Critical processes monitored
    • [ ] DOWN messages handled
    • [ ] Clean shutdown on crash
  4. Recovery

    • [ ] Transient failures retried
    • [ ] Permanent failures recorded
    • [ ] System remains stable
  5. Tests

    • [ ] Crash propagation tested
    • [ ] Timeout handling tested
    • [ ] Recovery strategies tested

Known Limitations

  • No supervision tree (OTP supervisor)
  • Manual retry logic
  • Fixed retry count

Next Steps

After v0.8.0 completion:

  1. v0.9.0 begins Performance Phase
  2. Robustness Phase complete
  3. Focus shifts to optimization

Implementation Notes

Process Supervision Diagram

population_monitor (gen_server)
    |
    +-- exoself_1 (linked)
    |       |
    |       +-- cortex (linked + monitored)
    |       +-- sensor_1 (linked)
    |       +-- sensor_2 (linked)
    |       +-- neuron_1 (linked)
    |       +-- neuron_2 (linked)
    |       +-- actuator_1 (linked)
    |
    +-- exoself_2 (linked)
            |
            +-- ... (same structure)

Timeout Configuration Pattern

%% In configuration
-define(DEFAULT_SYNC_TIMEOUT, 30000).
-define(DEFAULT_INPUT_TIMEOUT, 10000).
-define(DEFAULT_EVALUATION_TIMEOUT, 60000).

%% Configurable via state
init_state(Config) ->
    #state{
        sync_timeout = maps:get(sync_timeout, Config, ?DEFAULT_SYNC_TIMEOUT),
        input_timeout = maps:get(input_timeout, Config, ?DEFAULT_INPUT_TIMEOUT)
    }.

Safe Termination Pattern

terminate_safely(Pids) ->
    %% Send terminate to all
    [Pid ! terminate || Pid <- Pids],

    %% Wait with timeout
    WaitResults = [
        wait_for_exit(Pid, 1000)
        || Pid <- Pids
    ],

    %% Force kill any remaining
    [exit(Pid, kill) || {Pid, timeout} <- WaitResults],

    ok.

wait_for_exit(Pid, Timeout) ->
    receive
        {'EXIT', Pid, _} -> {Pid, ok}
    after Timeout ->
        {Pid, timeout}
    end.

Dependencies

External Dependencies

  • OTP process primitives

Internal Dependencies

  • v0.7.0: error handling, logging
  • All previously refactored modules

Effort Estimate

TaskEstimate
spawn_link migration1.5 days
Monitor implementation1.5 days
Timeout addition2 days
Recovery strategies2 days
Safety tests2 days
Documentation1 day
Total10 days

Risks

RiskMitigation
Cascade crashesCareful trap_exit handling
Timeout tuningConfigurable values
Recovery loopsRetry limits

Version: 0.8.0 Phase: Robustness Status: Planned