View Source Scenic.PubSub (Scenic v0.11.2)

Scenic.PubSub is a combination pub/sub server and data cache.

It is intended to be the interface between sensors (or other data sources) and Scenic scenes.

why-scenic-pubsub

Why Scenic.PubSub

Sensors (or other data sources) and scenes often need to communicate, but tend to operate on different timelines.

Some sensors update fairly slowly or don't behave well when asked to get data at random times by multiple clients. Scenic.PubSub is backed by a GenServer that collects data from a data source in a well-behaved manner, yet is able to serve that data on demand or by subscription to many clients.

global-scope

Global Scope

It is important to note that Scenic.PubSub is global in scope. In other words, anything published into Scenic.PubSub is visible to all ViewPorts and Scenes.

registering-data-sources

Registering Data Sources

Before a process can start publishing data from a source, it must register a source id with Scenic.PubSub. This source id should be an atom. This prevents other processes from stepping on that data and alerts any subscribing processes that the data is coming online.

  Scenic.PubSub.register( source_id )

The source_id parameter must be an atom that names the sensor. Subscribers will use this id to request data or subscriptions to the source.

You can can also unregister data sources if they are no longer available.

  Scenic.PubSub.unregister( source_id )

Simply exiting the data source process does also cleans up its registration.

publishing-data

Publishing Data

When a sensor process publishes data, two things happen. First, that data is cached in an :ets table so that future requests for that data from scenes happen quickly and don't need to bother the data source process. Second, any processes that have subscribed to that source are sent a message containing the new data.

  Scenic.PubSub.publish( source_id, value )

The source_id parameter must be the atom that was previously registered.

The value parameter can be anything that makes sense for the data source.

subscribing-to-a-data-source

Subscribing to a Data Source

Scenes (or any other process) can subscribe to a data source. They will receive messages when the source updates its data, comes online, or goes away.

  Scenic.PubSub.subscribe( source_id )

The source_id parameter is the atom registered for the data source. Note that the name source does NOT need to be registered when a listening process subscribes to it. When the source process eventually registers and starts publishing data, the listening subscribers will be notified.

The subscribing process will then start receiving messages that can be handled with handle_info/2

eventmessage sent to subscribers
data published{{Scenic.PubSub, :data}, {source_id, value, timestamp}}
source registered{{Scenic.PubSub, :registered}, {source_id, opts}}
source unregistered{{Scenic.PubSub, :unregistered}, source_id}

Scenes can also unsubscribe if they are no longer interested in updates.

  Scenic.PubSub.unsubscribe( source_id )

other-functions

Other functions

Any process can get data from a source on demand, whether or not it is a subscriber.

  Scenic.PubSub.get( source_id )
  >> {:ok, data}

Any process can list the currently registered data sources.

  Scenic.PubSub.list()
  >> [{source_id, opts, pid}]

Link to this section Summary

Functions

Returns a specification to start this module under a supervisor.

Retrieve the cached data for a named data source.

Retrieve the cached data value for a named data source.

Retrieve the cached data value for a named data source.

List the registered data sources.

Publish a data point from a data source.

Retrieve the full cached data for a named data source.

Register the calling process as a data source for the named id.

Subscribe the calling process to receive events about a data source.

Unregister the calling process as a data source for a data source.

Unsubscribe the calling process from receive events about a data source.

Link to this section Functions

Returns a specification to start this module under a supervisor.

See Supervisor.

@spec fetch(source_id :: atom()) :: {:ok, any()} | {:error, :not_found}

Retrieve the cached data for a named data source.

This data is pulled from an :ets table and does not put load on the data source itself.

parameters

Parameters

  • source_id an atom that is registered to a data source.

return-value

Return Value

  {:ok, {source_id, data, timestamp}}
  • source_id is the atom representing the data source.
  • data source_id whatever data the data source last published.
  • timestamp is the time - from :os.system_time(:micro_seconds) - the last data was published.

If the data source is either not registered, or has not yet published any data, get returns

  {:error, :no_data} 
@spec get(source_id :: atom()) :: any() | nil

Retrieve the cached data value for a named data source.

This data is pulled from an :ets table and does not put load on the data source itself.

parameters

Parameters

  • source_id an atom that is registered to a data source.

return-value

Return Value

  data

If the data source is either not registered, or has not yet published any data, get returns

  nil
@spec get!(source_id :: atom()) :: any()

Retrieve the cached data value for a named data source.

Raises an error if the value is not registered

This data is pulled from an :ets table and does not put load on the data source itself.

parameters

Parameters

  • source_id an atom that is registered to a data source.

return-value

Return Value

  data

If the data source is either not registered, or has not yet published any data, get returns

  nil
@spec list() :: [{atom(), Keyword.t(), pid()}]

List the registered data sources.

return-value

Return Value

list/0 returns a list of registered data sources

  [{source_id, version, description, pid}]
  • source_id is the atom representing the data source.
  • opts options list of metadata about the data source.
    • :version is the version string supplied by the data source during registration.
    • :description is the description string supplied by the data source during registration.
    • :registered_at The system time the data source was registered at.
  • pid is the pid of the data source's process.
Link to this function

publish(source_id, data)

View Source
@spec publish(source_id :: atom(), data :: any()) :: :ok

Publish a data point from a data source.

When a data source uses publish/2 to publish data, that data is recorded in the cache and a

  {{Scenic.PubSub, :data}, {source_id, my_value, timestamp}}

message is sent to each subscriber. The timestamp is the current time in microseconds as returned from :os.system_time(:micro_seconds).

parameters

Parameters

  • source_id an atom that is registered to a data source.
  • data the data to publish.

return-value

Return Value

On success, returns :ok

It returns {:error, :not_registered} if the caller is not the registered process for the data source.

@spec query(source_id :: atom()) :: {:ok, any()} | {:error, :not_found}

Retrieve the full cached data for a named data source.

This data is pulled from an :ets table and does not put load on the data source itself.

parameters

Parameters

  • source_id an atom that is registered to a data source.

return-value

Return Value

  {:ok, {source_id, data, timestamp}}
  • source_id is the atom representing the data source.
  • data source_id whatever data the data source last published.
  • timestamp is the time - from :os.system_time(:micro_seconds) - the last data was published.

If the data source is either not registered, or has not yet published any data, get returns

  {:error, :not_found} 
Link to this function

register(source_id, opts \\ [])

View Source
@spec register(source_id :: atom(), opts :: Keyword.t()) ::
  {:ok, atom()} | {:error, :already_registered}

Register the calling process as a data source for the named id.

parameters

Parameters

  • source_id the data source being registered.
  • opts optional information about the data source.

Supported options:

return-value

Return Value

On success, returns {:ok, source_id}

If source_id is already registered to another process, it returns

{:error, :already_registered}
@spec subscribe(source_id :: atom()) :: :ok

Subscribe the calling process to receive events about a data source.

The messages the subscriber will start receiving about a data source are:

eventmessage sent to subscribers
data published{{Scenic.PubSub, :data}, {source_id, value, timestamp}}
source registered{{Scenic.PubSub, :registered}, {source_id, opts}}
source unregistered{{Scenic.PubSub, :unregistered}, source_id}

parameters

Parameters

  • source_id an atom that is registered to a data source.

return-value

Return Value

On success, returns :ok

@spec unregister(source_id :: atom()) :: :ok

Unregister the calling process as a data source for a data source.

parameters

Parameters

  • source_id the data source being registered.

return-value

Return Value

Returns :ok

@spec unsubscribe(source_id :: atom()) :: :ok

Unsubscribe the calling process from receive events about a data source.

The caller will stop receiving events about a data source

parameters

Parameters

  • source_id an atom that is registered to a data source.

return-value

Return Value

Returns :ok