space_ex v0.8.0 SpaceEx.Stream
Enables data to be streamed as it changes, instead of polled repeatedly.
Streams are an efficient way to repeatedly retrieve data from the game. In most situations where you find yourself asking for the same piece of data over and over, you’re probably better off using a stream. This will reduce the load on both ends and make your code run faster.
However, if your code checks the value infrequently (e.g. once per second or less), but the value is changing constantly (altitude, current time, etc.), you should consider either using polling, or reducing the stream’s rate.
Creating streams
To set up a stream, you can use the SpaceEx.Stream.stream/2 macro to wrap a
procedure call.
Alternatively, you can use SpaceEx.ProcedureCall.create/1 to create a reference
to the procedure you want to run, and then pass that to
SpaceEx.Stream.create/2.
Stream lifecycle
Generally, multiple requests to stream the exact same data will be detected by the kRPC server, and each request will return the same stream.
This would ordinarily be a problem for a multi-process environment like
Erlang. If one process calls shutdown/1, it might remove a stream that
other processes are currently using.
To prevent this, when you create a stream, you also create a bond between
that stream and your current process. Calling shutdown/1 will break that
bond, but the stream will only actually shut down if all bonded processes
are no longer alive.
Streams will also automatically shut down if all bonded processes terminate.
Example usage
require SpaceEx.Stream
stream =
SpaceEx.SpaceCenter.ut(conn)
|> SpaceEx.Stream.stream()
# This is equivalent to ...
#require SpaceEx.ProcedureCall
#stream =
# SpaceEx.SpaceCenter.ut(conn)
# |> SpaceEx.ProcedureCall.create()
# |> SpaceEx.Stream.create()
SpaceEx.Stream.get(stream) # 83689.09043863538
Process.sleep(100)
SpaceEx.Stream.get(stream) # 83689.1904386354
Process.sleep(100)
SpaceEx.Stream.get(stream) # 83689.29043863542
# You can also create a nifty "magic variable" shorthand:
ut = SpaceEx.Stream.get_fn(stream)
ut.() # 83689.29043863544
Process.sleep(100)
ut.() # 83689.39043863546
Process.sleep(100)
ut.() # 83689.49043863548
# You can even create both the stream and the shortcut at once:
{stream, ut} =
SpaceEx.SpaceCenter.get_ut(conn)
|> SpaceEx.Stream.stream()
|> SpaceEx.Stream.with_get_fn()
SpaceEx.Stream.get(stream) # 83689.49043863541
ut.() # 83689.49043863541
# Or, for added efficiency, you can subscribe to a stream ...
SpaceEx.Stream.subscribe(stream)
# ... and then receive messages in your process mailbox.
SpaceEx.Stream.receive_latest(stream) # 83689.49043863541
Process.sleep(100)
SpaceEx.Stream.receive_latest(stream) # 83689.59043863543
Process.sleep(100)
SpaceEx.Stream.receive_latest(stream) # 83689.69043863545
# But don't forget to unsubscribe when you're done,
# because the messages keep coming whether you're
# receiving them or not. :)
# (Unless your process just exits here.)
SpaceEx.Stream.unsubscribe(stream)
Link to this section Summary
Functions
Creates a stream, and optionally starts it
Get (and decode) the current value from a stream
Returns an anonymous function that can be used to query the stream
Receives the latest value from a subscribed stream using subscribe/2
Receives the next value from a subscribed stream using subscribe/2
Detach from a stream, and shut it down if possible
Set the update rate of a stream
Start a previously added stream
Creates a stream directly from function call syntax
Receive the next decoded value from a stream as a message
Cancels a previous subscription created by subscribe/2
Wait for the stream value to change
Convenience function to return a stream and a get_fn/1 function
Link to this section Functions
Creates a stream, and optionally starts it.
procedure should be a SpaceEx.ProcedureCall structure. The stream’s value
will be the result of calling this procedure over and over (with the same
arguments each time).
See the module documentation for usage examples.
Options
:start— whenfalse, the stream is created, but not started. Default:start: true.:rate— the stream’s update rate, in updates per second. Default: unlimited.
Get (and decode) the current value from a stream.
This will retrieve the latest stream value and decode it. (Because streams can receive hundreds of updates every second, stream values are not decoded until requested.)
Note that if a stream has not received any data yet, this function will block
for up to timeout milliseconds (default: 5 seconds) for the first
value to arrive.
A timeout usually indicates that the stream was created with start: false,
and it was not subsequently started before the timeout expired.
Returns an anonymous function that can be used to query the stream.
This function can make code cleaner, by emulating a sort of “magic variable”
that constantly updates itself to the current value. For example, if you
assign apoapsis = SpaceEx.Stream.get_fn(apo_stream), you can now use
apoapsis.() to get the up-to-date value at any time.
Receives the latest value from a subscribed stream using subscribe/2.
If no stream results for stream are in the process mailbox, this will block
(up to timeout milliseconds) until a result is received. Otherwise, it
will immediately return the latest stream result and discard the rest.
This is generally the best way to receive results from a subscribed stream, since most code is only concerned with the current value of the stream. If your receive loop is sufficiently fast (relative to the stream rate), it can process every single value; otherwise, it will skip values in order to stay current and not flood the process mailbox.
Receives the next value from a subscribed stream using subscribe/2.
If no stream results for stream are in the process mailbox, this will block
(up to timeout milliseconds) until a result is received. Otherwise, it
will immediately return the first stream result in the process mailbox.
This can be used if your code must monitor every received stream result, e.g. if you’re monitoring for a rare abnormal value in the data. This is a relatively rare use case — especially since streams have a polling rate, so there’s no guarantee you’ll be able to catch said value at all.
In most cases, your code only needs the current value of a stream, and so you
should use receive_latest/2 instead.
Loop performance & falling behind
Since messages are being constantly sent to your process, and receive_next
does not skip messages, your loop needs to be fast enough (relative to the
stream rate) to process all messages and not fall behind. Otherwise, the
process mailbox will continually grow with more and more pending results, and
the results processed by your code will be more and more out-of-date.
Falling behind can often be subtle and hard to detect, so receive_next
has a built-in safeguard by default. The :max_age option indicates the
maximum age (in milliseconds) of a returned result. If we would return a
result older than that, a SpaceEx.Stream.StaleDataError is raised instead.
If your code must monitor every value, then a SpaceEx.Stream.StaleDataError
is a fatal error — you should increase the speed of your code or pick a lower
stream rate. Otherwise, there are various ways to handle this error. For
example, you could process the data anyway (via the :result field in the
error), or you could issue a one-off receive_latest/2 call to flush all
pending data and start over from the latest. However, if you’re encountering
this error regularly, you should probably rethink your approach.
Options
:timeout— Maximum time (in milliseconds) to wait for the next stream result, or:infinityto wait forever. Default:5000:max_age— Maximum age of the next stream result, or:infinityfor no limit. Default:1000
Detach from a stream, and shut it down if possible.
Streams will not shut down until all processes that depend on this stream have exited or have called this function. This is to prevent streams unexpectedly closing for all processes, just because one of them is done.
Set the update rate of a stream.
rate is the number of updates per second. Setting the rate to 0 or nil
will remove all rate limiting and update as often as possible.
Start a previously added stream.
If a stream is created with start: false, you can use this function to
choose when to start receiving data.
Creates a stream directly from function call syntax.
This is equivalent to calling SpaceEx.ProcedureCall.create/1 and SpaceEx.Stream.create/2.
Example
stream =
SpaceEx.SpaceCenter.Flight.mean_altitude(flight)
|> SpaceEx.Stream.stream()
SpaceEx.Stream.get(stream) # 76.64177794696297
Receive the next decoded value from a stream as a message.
This is the non-blocking version of wait/2. As soon as the stream receives
a value, a message will be delivered to the calling process.
Messages will continue to be sent until the calling process calls
unsubscribe/1 (unless the :remove option is true; see below).
It’s recommended that you use either receive_latest/2 or receive_next/2
to receive messages from streams. These functions are designed to prevent
unexpected results if your code processes stream messages slower than the
stream generates them.
If you choose to receive stream results directly instead, the format is
{:stream_result, id, result} where id is the value of stream.id and
result is a SpaceEx.Stream.Result structure.
Options
:immediate— iftrueand the stream has already received at least one result, the latest result will be sent immediately. The subscription will continue normally after that. Default:false:remove— iftrue, thenremove/1(andunsubscribe/1) will be called immediately after sending the first subscribed result. Only one message will be delivered. Default:false
If both :immediate and :remove are true, and the stream has received at
least one result, then the latest result will be sent immediately, no further
messages will be sent (i.e. no subscription), and remove/1 will be called.
Cancels a previous subscription created by subscribe/2.
The calling process will no longer receive stream result messages for the given stream. Note that there may still be stream results already waiting in the process mailbox, but no more will be added once this function returns.
Wait for the stream value to change.
This will wait until a new stream value is received, then retrieve it and
decode it. It will block for up to timeout milliseconds (default:
forever, aka :infinity), after which it will throw an exit signal.
You can technically catch this exit signal with try ... catch :exit, _,
but it’s not generally considered good practice to do so. As such, wait/2
timeouts should generally be reserved for “something has gone badly wrong”.
Example
paused = SpaceEx.SpaceCenter.paused(conn) |> SpaceEx.Stream.stream()
SpaceEx.Stream.wait(paused) # returns true/false the next time you un/pause
Convenience function to return a stream and a get_fn/1 function.
When passed a stream object, will return {stream, get_fn(stream)}.
Example
{stream, altitude} =
SpaceEx.SpaceCenter.Flight.mean_altitude(flight)
|> SpaceEx.Stream.stream()
|> SpaceEx.Stream.with_get_fn()
altitude.() |> IO.inspect # 76.64177794696297