Scenic.PubSub (Scenic v0.11.0-beta.0) View Source

Scenic.PubSub is a combination pub/sub server and data cache.

It is intended to be the interface between sensors (or other data sources) and Scenic scenes.

Why Scenic.PubSub

Sensors (or other data sources) and scenes often need to communicate, but tend to operate on different timelines.

Some sensors update fairly slowly or don't behave well when asked to get data at random times by multiple clients. Scenic.PubSub is backed by a GenServer that collects data from a data source in a well-behaved manner, yet is able to serve that data on demand or by subscription to many clients.

Global Scope

It is important to note that Scenic.PubSub is global in scope. In other words, anything published into Scenic.PubSub is visible to all ViewPorts and Scenes.

Registering Data Sources

Before a process can start publishing data from a source, it must register a source id with Scenic.PubSub. This source id should be an atom. This prevents other processes from stepping on that data and alerts any subscribing processes that the data is coming online.

  Scenic.PubSub.register( source_id )

The source_id parameter must be an atom that names the sensor. Subscribers will use this id to request data or subscriptions to the source.

You can can also unregister data sources if they are no longer available.

  Scenic.PubSub.unregister( source_id )

Simply exiting the data source process does also cleans up its registration.

Publishing Data

When a sensor process publishes data, two things happen. First, that data is cached in an :ets table so that future requests for that data from scenes happen quickly and don't need to bother the data source process. Second, any processes that have subscribed to that source are sent a message containing the new data.

  Scenic.PubSub.publish( source_id, value )

The source_id parameter must be the atom that was previously registered.

The value parameter can be anything that makes sense for the data source.

Subscribing to a Data Source

Scenes (or any other process) can subscribe to a data source. They will receive messages when the source updates its data, comes online, or goes away.

  Scenic.PubSub.subscribe( source_id )

The source_id parameter is the atom registered for the data source. Note that the name source does NOT need to be registered when a listening process subscribes to it. When the source process eventually registers and starts publishing data, the listening subscribers will be notified.

The subscribing process will then start receiving messages that can be handled with handle_info/2

eventmessage sent to subscribers
data published{{Scenic.PubSub, :data}, {source_id, value, timestamp}}
source registered{{Scenic.PubSub, :registered}, {source_id, opts}}
source unregistered{{Scenic.PubSub, :unregistered}, source_id}

Scenes can also unsubscribe if they are no longer interested in updates.

  Scenic.PubSub.unsubscribe( source_id )

Other functions

Any process can get data from a source on demand, whether or not it is a subscriber.

  Scenic.PubSub.get( source_id )
  >> {:ok, data}

Any process can list the currently registered data sources.

  Scenic.PubSub.list()
  >> [{source_id, opts, pid}]

Link to this section Summary

Functions

Returns a specification to start this module under a supervisor.

Retrieve the cached data for a named data source.

Retrieve the cached data value for a named data source.

Retrieve the cached data value for a named data source.

List the registered data sources.

Publish a data point from a data source.

Retrieve the full cached data for a named data source.

Register the calling process as a data source for the named id.

Subscribe the calling process to receive events about a data source.

Unregister the calling process as a data source for a data source.

Unsubscribe the calling process from receive events about a data source.

Link to this section Functions

Returns a specification to start this module under a supervisor.

See Supervisor.

Specs

fetch(source_id :: atom()) :: {:ok, any()} | {:error, :not_found}

Retrieve the cached data for a named data source.

This data is pulled from an :ets table and does not put load on the data source itself.

Parameters

  • source_id an atom that is registered to a data source.

Return Value

  {:ok, {source_id, data, timestamp}}
  • source_id is the atom representing the data source.
  • data source_id whatever data the data source last published.
  • timestamp is the time - from :os.system_time(:micro_seconds) - the last data was published.

If the data source is either not registered, or has not yet published any data, get returns

  {:error, :no_data} 

Specs

get(source_id :: atom()) :: any() | nil

Retrieve the cached data value for a named data source.

This data is pulled from an :ets table and does not put load on the data source itself.

Parameters

  • source_id an atom that is registered to a data source.

Return Value

  data

If the data source is either not registered, or has not yet published any data, get returns

  nil

Specs

get!(source_id :: atom()) :: any()

Retrieve the cached data value for a named data source.

Raises an error if the value is not registered

This data is pulled from an :ets table and does not put load on the data source itself.

Parameters

  • source_id an atom that is registered to a data source.

Return Value

  data

If the data source is either not registered, or has not yet published any data, get returns

  nil

Specs

list() :: [{atom(), Keyword.t(), pid()}]

List the registered data sources.

Return Value

list/0 returns a list of registered data sources

  [{source_id, version, description, pid}]
  • source_id is the atom representing the data source.
  • opts options list of metadata about the data source.
    • :version is the version string supplied by the data source during registration.
    • :description is the description string supplied by the data source during registration.
    • :registered_at The system time the data source was registered at.
  • pid is the pid of the data source's process.
Link to this function

publish(source_id, data)

View Source

Specs

publish(source_id :: atom(), data :: any()) :: :ok

Publish a data point from a data source.

When a data source uses publish/2 to publish data, that data is recorded in the cache and a

  {{Scenic.PubSub, :data}, {source_id, my_value, timestamp}}

message is sent to each subscriber. The timestamp is the current time in microseconds as returned from :os.system_time(:micro_seconds).

Parameters

  • source_id an atom that is registered to a data source.
  • data the data to publish.

Return Value

On success, returns :ok

It returns {:error, :not_registered} if the caller is not the registered process for the data source.

Specs

query(source_id :: atom()) :: {:ok, any()} | {:error, :not_found}

Retrieve the full cached data for a named data source.

This data is pulled from an :ets table and does not put load on the data source itself.

Parameters

  • source_id an atom that is registered to a data source.

Return Value

  {:ok, {source_id, data, timestamp}}
  • source_id is the atom representing the data source.
  • data source_id whatever data the data source last published.
  • timestamp is the time - from :os.system_time(:micro_seconds) - the last data was published.

If the data source is either not registered, or has not yet published any data, get returns

  {:error, :not_found} 
Link to this function

register(source_id, opts \\ [])

View Source

Specs

register(source_id :: atom(), opts :: Keyword.t()) ::
  {:ok, atom()} | {:error, :already_registered}

Register the calling process as a data source for the named id.

Parameters

  • source_id the data source being registered.
  • opts optional information about the data source.

Supported options:

  • :version - Data format version

  • :description - Your appropriate description

Return Value

On success, returns {:ok, source_id}

If source_id is already registered to another process, it returns

{:error, :already_registered}

Specs

subscribe(source_id :: atom()) :: :ok

Subscribe the calling process to receive events about a data source.

The messages the subscriber will start receiving about a data source are:

eventmessage sent to subscribers
data published{{Scenic.PubSub, :data}, {source_id, value, timestamp}}
source registered{{Scenic.PubSub, :registered}, {source_id, opts}}
source unregistered{{Scenic.PubSub, :unregistered}, source_id}

Parameters

  • source_id an atom that is registered to a data source.

Return Value

On success, returns :ok

Specs

unregister(source_id :: atom()) :: :ok

Unregister the calling process as a data source for a data source.

Parameters

  • source_id the data source being registered.

Return Value

Returns :ok

Specs

unsubscribe(source_id :: atom()) :: :ok

Unsubscribe the calling process from receive events about a data source.

The caller will stop receiving events about a data source

Parameters

  • source_id an atom that is registered to a data source.

Return Value

Returns :ok