brain_pubsub (macula_tweann v0.18.1)

View Source

Internal pub/sub for brain subsystem communication using pg.

This module provides a thin wrapper around OTP's pg (process groups) for communication between brain subsystems. It uses pg's built-in group management with brain-specific naming conventions.

Design Philosophy

Brain subsystems communicate through events rather than direct calls: - Publishers don't know about subscribers - Subscribers don't know about publishers - New subsystems can be added without modifying existing code - Events form a clear contract between subsystems

Event Types

| Event | Publisher | Description | |-------|-----------|-------------| | evaluated | brain | After each forward propagation | | reward_received | brain_system | When reward signal arrives | | weights_updated | brain_learner | After plasticity applied | | learning_toggled | brain_system | Learning enabled/disabled |

Usage

Initialize pubsub for a brain (typically in brain_sup): brain_pubsub:init(BrainId)

Subscribe to events: brain_pubsub:subscribe(BrainId, evaluated) brain_pubsub:subscribe(BrainId, [evaluated, reward_received])

Publish events: brain_pubsub:publish(BrainId, evaluated, #{activations => Acts})

Receive events in subscriber: receive {brain_event, evaluated, Data} -> %% Handle evaluation event ... end

Implementation

Uses OTP pg (process groups) with group names of the form: {brain_pubsub, BrainId, Topic}

This allows multiple brain instances to have independent pubsub.

Summary

Functions

Cleanup pubsub for a brain instance.

Get list of subscribers for a topic.

Initialize pubsub for a brain instance.

List all topics with active subscriptions for a brain.

Publish an event to all subscribers.

Subscribe calling process to event type(s).

Subscribe a specific process to an event type.

Unsubscribe calling process from event type(s).

Unsubscribe a specific process from an event type.

Types

brain_id/0

-type brain_id() :: term().

event_data/0

-type event_data() :: map().

event_type/0

-type event_type() ::
          evaluated | reward_received | weights_updated | learning_toggled | experience_recorded |
          weights_requested | weights_response |
          atom().

Allow custom events

Functions

cleanup(BrainId)

-spec cleanup(brain_id()) -> ok.

Cleanup pubsub for a brain instance.

Removes all subscriptions for the current process related to this brain. Typically called during brain shutdown.

get_subscribers(BrainId, Topic)

-spec get_subscribers(brain_id(), event_type()) -> [pid()].

Get list of subscribers for a topic.

init(BrainId)

-spec init(brain_id()) -> ok.

Initialize pubsub for a brain instance.

This ensures the pg scope is started. Safe to call multiple times. Typically called from brain_sup during initialization.

list_topics(BrainId)

-spec list_topics(brain_id()) -> [event_type()].

List all topics with active subscriptions for a brain.

publish(BrainId, Topic, Data)

-spec publish(brain_id(), event_type(), event_data()) -> ok.

Publish an event to all subscribers.

Sends {brain_event, Topic, Data} to all processes subscribed to this topic. This is asynchronous - returns immediately after sending.

subscribe(BrainId, Topics)

-spec subscribe(brain_id(), event_type() | [event_type()]) -> ok.

Subscribe calling process to event type(s).

subscribe(BrainId, Topic, Pid)

-spec subscribe(brain_id(), event_type(), pid()) -> ok.

Subscribe a specific process to an event type.

unsubscribe(BrainId, Topics)

-spec unsubscribe(brain_id(), event_type() | [event_type()]) -> ok.

Unsubscribe calling process from event type(s).

unsubscribe(BrainId, Topic, Pid)

-spec unsubscribe(brain_id(), event_type(), pid()) -> ok.

Unsubscribe a specific process from an event type.