space_ex v0.8.0 SpaceEx.Stream

Enables data to be streamed as it changes, instead of polled repeatedly.

Streams are an efficient way to repeatedly retrieve data from the game. In most situations where you find yourself asking for the same piece of data over and over, you’re probably better off using a stream. This will reduce the load on both ends and make your code run faster.

However, if your code checks the value infrequently (e.g. once per second or less), but the value is changing constantly (altitude, current time, etc.), you should consider either using polling, or reducing the stream’s rate.

Creating streams

To set up a stream, you can use the SpaceEx.Stream.stream/2 macro to wrap a procedure call.

Alternatively, you can use SpaceEx.ProcedureCall.create/1 to create a reference to the procedure you want to run, and then pass that to SpaceEx.Stream.create/2.

Stream lifecycle

Generally, multiple requests to stream the exact same data will be detected by the kRPC server, and each request will return the same stream.

This would ordinarily be a problem for a multi-process environment like Erlang. If one process calls shutdown/1, it might remove a stream that other processes are currently using.

To prevent this, when you create a stream, you also create a bond between that stream and your current process. Calling shutdown/1 will break that bond, but the stream will only actually shut down if all bonded processes are no longer alive.

Streams will also automatically shut down if all bonded processes terminate.

Example usage

require SpaceEx.Stream

stream =
  SpaceEx.SpaceCenter.ut(conn)
  |> SpaceEx.Stream.stream()

# This is equivalent to ...
#require SpaceEx.ProcedureCall
#stream =
#  SpaceEx.SpaceCenter.ut(conn)
#  |> SpaceEx.ProcedureCall.create()
#  |> SpaceEx.Stream.create()

SpaceEx.Stream.get(stream)  # 83689.09043863538
Process.sleep(100)
SpaceEx.Stream.get(stream)  # 83689.1904386354
Process.sleep(100)
SpaceEx.Stream.get(stream)  # 83689.29043863542

# You can also create a nifty "magic variable" shorthand:
ut = SpaceEx.Stream.get_fn(stream)

ut.()  # 83689.29043863544
Process.sleep(100)
ut.()  # 83689.39043863546
Process.sleep(100)
ut.()  # 83689.49043863548

# You can even create both the stream and the shortcut at once:
{stream, ut} =
  SpaceEx.SpaceCenter.get_ut(conn)
  |> SpaceEx.Stream.stream()
  |> SpaceEx.Stream.with_get_fn()

SpaceEx.Stream.get(stream)  # 83689.49043863541
ut.()  # 83689.49043863541

# Or, for added efficiency, you can subscribe to a stream ...
SpaceEx.Stream.subscribe(stream)

# ... and then receive messages in your process mailbox.
SpaceEx.Stream.receive_latest(stream)  # 83689.49043863541
Process.sleep(100)
SpaceEx.Stream.receive_latest(stream)  # 83689.59043863543
Process.sleep(100)
SpaceEx.Stream.receive_latest(stream)  # 83689.69043863545

# But don't forget to unsubscribe when you're done,
# because the messages keep coming whether you're
# receiving them or not. :)
# (Unless your process just exits here.)
SpaceEx.Stream.unsubscribe(stream)

Link to this section Summary

Functions

Creates a stream, and optionally starts it

Get (and decode) the current value from a stream

Returns an anonymous function that can be used to query the stream

Receives the latest value from a subscribed stream using subscribe/2

Receives the next value from a subscribed stream using subscribe/2

Detach from a stream, and shut it down if possible

Set the update rate of a stream

Start a previously added stream

Creates a stream directly from function call syntax

Receive the next decoded value from a stream as a message

Cancels a previous subscription created by subscribe/2

Wait for the stream value to change

Convenience function to return a stream and a get_fn/1 function

Link to this section Functions

Link to this function create(procedure, opts \\ [])

Creates a stream, and optionally starts it.

procedure should be a SpaceEx.ProcedureCall structure. The stream’s value will be the result of calling this procedure over and over (with the same arguments each time).

See the module documentation for usage examples.

Options

  • :start — when false, the stream is created, but not started. Default: start: true.
  • :rate — the stream’s update rate, in updates per second. Default: unlimited.
Link to this function get(stream, timeout \\ 5000)

Get (and decode) the current value from a stream.

This will retrieve the latest stream value and decode it. (Because streams can receive hundreds of updates every second, stream values are not decoded until requested.)

Note that if a stream has not received any data yet, this function will block for up to timeout milliseconds (default: 5 seconds) for the first value to arrive.

A timeout usually indicates that the stream was created with start: false, and it was not subsequently started before the timeout expired.

Returns an anonymous function that can be used to query the stream.

This function can make code cleaner, by emulating a sort of “magic variable” that constantly updates itself to the current value. For example, if you assign apoapsis = SpaceEx.Stream.get_fn(apo_stream), you can now use apoapsis.() to get the up-to-date value at any time.

Link to this function receive_latest(stream, timeout \\ 5000)

Receives the latest value from a subscribed stream using subscribe/2.

If no stream results for stream are in the process mailbox, this will block (up to timeout milliseconds) until a result is received. Otherwise, it will immediately return the latest stream result and discard the rest.

This is generally the best way to receive results from a subscribed stream, since most code is only concerned with the current value of the stream. If your receive loop is sufficiently fast (relative to the stream rate), it can process every single value; otherwise, it will skip values in order to stay current and not flood the process mailbox.

Link to this function receive_next(stream, opts \\ [])

Receives the next value from a subscribed stream using subscribe/2.

If no stream results for stream are in the process mailbox, this will block (up to timeout milliseconds) until a result is received. Otherwise, it will immediately return the first stream result in the process mailbox.

This can be used if your code must monitor every received stream result, e.g. if you’re monitoring for a rare abnormal value in the data. This is a relatively rare use case — especially since streams have a polling rate, so there’s no guarantee you’ll be able to catch said value at all.

In most cases, your code only needs the current value of a stream, and so you should use receive_latest/2 instead.

Loop performance & falling behind

Since messages are being constantly sent to your process, and receive_next does not skip messages, your loop needs to be fast enough (relative to the stream rate) to process all messages and not fall behind. Otherwise, the process mailbox will continually grow with more and more pending results, and the results processed by your code will be more and more out-of-date.

Falling behind can often be subtle and hard to detect, so receive_next has a built-in safeguard by default. The :max_age option indicates the maximum age (in milliseconds) of a returned result. If we would return a result older than that, a SpaceEx.Stream.StaleDataError is raised instead.

If your code must monitor every value, then a SpaceEx.Stream.StaleDataError is a fatal error — you should increase the speed of your code or pick a lower stream rate. Otherwise, there are various ways to handle this error. For example, you could process the data anyway (via the :result field in the error), or you could issue a one-off receive_latest/2 call to flush all pending data and start over from the latest. However, if you’re encountering this error regularly, you should probably rethink your approach.

Options

  • :timeout — Maximum time (in milliseconds) to wait for the next stream result, or :infinity to wait forever. Default: 5000
  • :max_age — Maximum age of the next stream result, or :infinity for no limit. Default: 1000

Detach from a stream, and shut it down if possible.

Streams will not shut down until all processes that depend on this stream have exited or have called this function. This is to prevent streams unexpectedly closing for all processes, just because one of them is done.

Link to this function set_rate(stream, rate)

Set the update rate of a stream.

rate is the number of updates per second. Setting the rate to 0 or nil will remove all rate limiting and update as often as possible.

Start a previously added stream.

If a stream is created with start: false, you can use this function to choose when to start receiving data.

Link to this macro stream(function_call, opts \\ []) (macro)

Creates a stream directly from function call syntax.

This is equivalent to calling SpaceEx.ProcedureCall.create/1 and SpaceEx.Stream.create/2.

Example

stream =
  SpaceEx.SpaceCenter.Flight.mean_altitude(flight)
  |> SpaceEx.Stream.stream()

SpaceEx.Stream.get(stream)  # 76.64177794696297
Link to this function subscribe(stream, opts \\ [])

Receive the next decoded value from a stream as a message.

This is the non-blocking version of wait/2. As soon as the stream receives a value, a message will be delivered to the calling process.

Messages will continue to be sent until the calling process calls unsubscribe/1 (unless the :remove option is true; see below).

It’s recommended that you use either receive_latest/2 or receive_next/2 to receive messages from streams. These functions are designed to prevent unexpected results if your code processes stream messages slower than the stream generates them.

If you choose to receive stream results directly instead, the format is {:stream_result, id, result} where id is the value of stream.id and result is a SpaceEx.Stream.Result structure.

Options

  • :immediate — if true and the stream has already received at least one result, the latest result will be sent immediately. The subscription will continue normally after that. Default: false
  • :remove — if true, then remove/1 (and unsubscribe/1) will be called immediately after sending the first subscribed result. Only one message will be delivered. Default: false

If both :immediate and :remove are true, and the stream has received at least one result, then the latest result will be sent immediately, no further messages will be sent (i.e. no subscription), and remove/1 will be called.

Link to this function unsubscribe(stream)

Cancels a previous subscription created by subscribe/2.

The calling process will no longer receive stream result messages for the given stream. Note that there may still be stream results already waiting in the process mailbox, but no more will be added once this function returns.

Link to this function wait(stream, timeout \\ :infinity)

Wait for the stream value to change.

This will wait until a new stream value is received, then retrieve it and decode it. It will block for up to timeout milliseconds (default: forever, aka :infinity), after which it will throw an exit signal.

You can technically catch this exit signal with try ... catch :exit, _, but it’s not generally considered good practice to do so. As such, wait/2 timeouts should generally be reserved for “something has gone badly wrong”.

Example

paused = SpaceEx.SpaceCenter.paused(conn) |> SpaceEx.Stream.stream()

SpaceEx.Stream.wait(paused)  # returns true/false the next time you un/pause
Link to this function with_get_fn(stream)

Convenience function to return a stream and a get_fn/1 function.

When passed a stream object, will return {stream, get_fn(stream)}.

Example

{stream, altitude} =
  SpaceEx.SpaceCenter.Flight.mean_altitude(flight)
  |> SpaceEx.Stream.stream()
  |> SpaceEx.Stream.with_get_fn()

altitude.() |> IO.inspect  # 76.64177794696297