network_pubsub (macula_tweann v0.18.1)

View Source

Internal pub/sub for neural network component communication using pg.

This module provides a thin wrapper around OTP's pg (process groups) for communication between network components (cortex, sensors, neurons, actuators). It uses pg's built-in group management with network-specific naming conventions.

Design Philosophy

Network components communicate through events rather than direct calls: - Publishers don't need to know subscriber PIDs - Subscribers don't need to know publisher PIDs - New observers can be added without modifying existing code - Events form a clear contract between components

Event Types

| Event | Publisher | Description | |-------|-----------|-------------| | evaluation_cycle_started | cortex | When sync is triggered | | sensor_output_ready | sensor | When sensor produces output | | neuron_output_ready | neuron | When neuron fires | | actuator_output_ready | actuator | When actuator produces output | | backup_requested | cortex | When weight backup is needed | | network_terminating | cortex | When network is shutting down |

Usage

Initialize pubsub for a network (typically in cortex): network_pubsub:init(NetworkId)

Subscribe to events: network_pubsub:subscribe(NetworkId, evaluation_cycle_started) network_pubsub:subscribe(NetworkId, [sensor_output_ready, neuron_output_ready])

Publish events: network_pubsub:publish(NetworkId, evaluation_cycle_started, #{cycle => 1}) network_pubsub:publish(NetworkId, sensor_output_ready, #{from => SensorPid, signal => Signal})

Receive events in subscriber: receive {network_event, evaluation_cycle_started, Data} -> %% Handle evaluation start ... end

Implementation

Uses OTP pg (process groups) with group names of the form: {network_pubsub, NetworkId, Topic}

This allows multiple network instances to have independent pubsub.

Summary

Functions

Cleanup pubsub for a network instance.

Get list of subscribers for a topic.

Initialize pubsub for a network instance.

List all topics with active subscriptions for a network.

Publish an event to all subscribers.

Subscribe calling process to event type(s).

Subscribe a specific process to an event type.

Unsubscribe calling process from event type(s).

Unsubscribe a specific process from an event type.

Types

event_data/0

-type event_data() :: map().

event_type/0

-type event_type() ::
          evaluation_cycle_started | sensor_output_ready | neuron_output_ready | actuator_output_ready |
          backup_requested | weights_backed_up | network_terminating |
          atom().

Allow custom events

network_id/0

-type network_id() :: term().

Functions

cleanup(NetworkId)

-spec cleanup(network_id()) -> ok.

Cleanup pubsub for a network instance.

Removes all subscriptions for the current process related to this network. Typically called during network shutdown.

get_subscribers(NetworkId, Topic)

-spec get_subscribers(network_id(), event_type()) -> [pid()].

Get list of subscribers for a topic.

init(NetworkId)

-spec init(network_id()) -> ok.

Initialize pubsub for a network instance.

This ensures the pg scope is started. Safe to call multiple times. Typically called from cortex during initialization.

list_topics(NetworkId)

-spec list_topics(network_id()) -> [event_type()].

List all topics with active subscriptions for a network.

publish(NetworkId, Topic, Data)

-spec publish(network_id(), event_type(), event_data()) -> ok.

Publish an event to all subscribers.

Sends {network_event, Topic, Data} to all processes subscribed to this topic. This is asynchronous - returns immediately after sending.

subscribe(NetworkId, Topics)

-spec subscribe(network_id(), event_type() | [event_type()]) -> ok.

Subscribe calling process to event type(s).

subscribe(NetworkId, Topic, Pid)

-spec subscribe(network_id(), event_type(), pid()) -> ok.

Subscribe a specific process to an event type.

unsubscribe(NetworkId, Topics)

-spec unsubscribe(network_id(), event_type() | [event_type()]) -> ok.

Unsubscribe calling process from event type(s).

unsubscribe(NetworkId, Topic, Pid)

-spec unsubscribe(network_id(), event_type(), pid()) -> ok.

Unsubscribe a specific process from an event type.