bquarp v0.1.0 Reactivity.DSL.Signal View Source
The DSL for distributed reactive programming.
Link to this section Summary
Functions
Attaches a new consistency guarantee to the signal.
Checks if the given signal carries the given guarantee.
Applies a procedure to the values of a signal without changing them. Generally used for side effects.
Filters out the signal values that do not satisfy the given predicate.
Creates a signal from a plain observable.
Creates a signal from a signal observable, tags it with the given guarantees.
Returns the guarantees of the given signal.
Inspects the given signal by printing its output values to the console.
Keeps the given guarantee as the only guarantee of the given signal. Removes all other guarantees.
Lifts and applies an ordinary function to one or more signals
Lifts and applies a function that operates on lists of variable size to a list of signals that can be subject to change.
Merges multiple signals together such that the resulting signal carries the updates of all composed signals.
Publishes a signal by registering it in the registry.
Removes the given guarantee from the given signal. Leaves the signal alone if the guarantee is not present.
Applies a given procedure to a signal's value and its previous result. Works in the same way as the Enum.scan function
Sets the given consistency guarantee as the only guarantee for the given signal.
Gets a signal from the registry by its name.
Transforms a signal into a plain observable.
Transforms a signal into a signal observable, meaning that both the value and context {v, c} of each observable message are preserved.
Unregisters a signal from the registry by its name.
Link to this section Functions
add_guarantee(arg, cg) View Source
Attaches a new consistency guarantee to the signal.
The signal may already possess one or more consistency guarantees.
carries_guarantee?(arg, cg) View Source
Checks if the given signal carries the given guarantee.
each(arg, proc) View Source
Applies a procedure to the values of a signal without changing them. Generally used for side effects.
filter(arg, pred) View Source
filter(arg, pred, new_cg) View Source
Filters out the signal values that do not satisfy the given predicate.
The expected function should take one argument, the value of an observable and return a boolean: true if the value should be produced, false if the value should be discarded.
If no consistency guarantee is provided, it leaves the filtered signal 'as is'. The consequences of using this operator in this way are left to the developer.
In the other case, a new consistency guarantee is attached to this signal, discarding the previous ones. Thus, filtering in this way can be considered as the creation of a new source signal for this guarantee in a stratified dependency graph
from_plain_obs(obs) View Source
Creates a signal from a plain observable.
Attaches the given consistency guarantee to it if provided.
Otherwise attaches the globally defined consistency guarantee,
which is fifo with update semantics ({:fu, 0}) by default.
from_plain_obs(obs, cg) View Source
from_signal_obs(sobs) View Source
Creates a signal from a signal observable, tags it with the given guarantees.
The assumption here is that the contexts of the observable have already been attached. The primitive can be used for guarantees with non-obvious contexts (other than e.g. counters) the developer might come up with.
Attaches the given consistency guarantee to it if provided without changing the context.
Otherwise attaches the globally defined consistency guarantee,
which is fifo with update semantics ({:fu, 0}) by default.
from_signal_obs(sobs, cgs) View Source
guarantees(arg) View Source
Returns the guarantees of the given signal.
inspect(arg) View Source
Inspects the given signal by printing its output values to the console.
keep_guarantee(arg1, cgt) View Source
Keeps the given guarantee as the only guarantee of the given signal. Removes all other guarantees.
liftapp(s, func) View Source
Lifts and applies an ordinary function to one or more signals
Takes:
- A list of signals carrying any consistency guarantees
- A function taking the number of arguments that is equal to number of signals.
Returns:
- A signal for which the given function is applied to the input.
Output for the resulting signal is only created if a newly received message from an input signal can be combined with the rest of the buffer under the consistency guarantees of the different signals.
The resulting consistency guarantee of the output signal is a combination of the guarantees of the input signals.
When combining signals:
With update semantics:
- They have their last values kept as state until a more recent one is used.
- Each value is regarded as an update that may trigger a new output.
- This is similar to 'combine latest' in Reactive Extensions.
With propagate semantics:
- They have their used values kept only ntil they can be combined, at which point they are removed so they can't be used more than once.
Each value is regarded as a value in a (time-series) data stream to be combined and propagated.
- This is similar to 'zip' in Reactive Extensions.
With both update as well as propagate semantics:
- New output is triggered by propagate-signals in steady state.
- New input for update signals is kept as state to be combined with.
- The first value of an update signal can trigger a series of outputs if there is a propagate history that has been waiting for this value to combine.
- This is similar to 'combine latest silent' (with buffered propagation) in Reactive Extensions (specifically in the Observables Extended library)
E.g.: c = a + b (with a, b and the resulting c all having {:fu, _} = update-fifo for guarantee)
a: 5 --------------------------------- 1 -->
b: ------------- 3 --------- 5 ------------>
c: -------------- 8 --------- 10 ------ 6 ->
E.g.: c = a + b (with a having {:fu, _}, b having {:fp, _}
and the resulting c having [{:fu, _}, {:fp, _}] for guarantee)
a: 5 ------------------------------------ 1
b: ------------- 3 ------ 5 -------------->
c: -------------- 8 ------ 10 ------------>
E.g.: c = a + b (with a, b and the resulting c all having {:fp, _} = propagate-fifo for guarantee)
a: 5 --------------------------------- 1 -->
b: ------------- 3 ---- 5 ----------------->
c: -------------- 8 ------------------- 6 ->
E.g.: c = a + b (with a, b and the resulting c all having {:t, 0} = strict time-synchronization for guarantee)
a: 5(2) ----------- 3(3} -------------- 1(4) -------------->
b: ---------- 4(1) ------------ 5(2) -------------- 3(3) -->
c: ----------------------------- 10(2) --------------6(3) ->
E.g.: c = a + b (with a, b and the resulting c all having {:t, 1} = relaxed time-synchronization for guarantee)
a: 5(2) -------------- 3(3} ------------- 1(4) ------------>
b: ---------- 4(1) -------------- 5(2) ------------- 3(3) ->
c: ----------- 9(1,2) ------------- 8(2,3) ---------- 4(3,4)
E.g.: c = a + b (with a, b and the resulting c all having {:c, 0} = strict causality for guarantee)
a: 5(x2,y2) --------------------------------------- 2(x3,y3)
b: -------- 3(x1) ---- 3(x2) --- 8(x3) --- 7(x4) ---------->
c: -------------------- 8(..) --- 13(..) --- 12(..) -- 9(..)
(For more information: consult the DREAM academic paper by Salvaneschi et al.)
E.g.: d = a + b + c (with a, b, c and the resulting d all having {:g, 0} = strict glitch freedom for guarantee)
a: 5(x2) ---------------------------------------- 2(x3) --->
b: ----- 3(x1) ------ 3(x2) -- 8(x3) ---------------------->
c: ------------ 7(y5) -------------- 4(y6)----------------->
d: ------------------- 15(x2,y5) ---- 12(x2,y6) -- 14(x3,y6)
(For more information: consult the QUARP and/or DREAM academic paper)
E.g.: d = a + b + c (with a, b having {:g, 0}, c having {:t, 0} and the resulting d having [{:g, 0}, {:t, 0}] for guarantee)
a: 5(x2) --------------------------------------------- 2(x3)
b: ------- 3(x1) ---------- 3(x2) --- 8(x3) --------------->
c: --------------- 7(5) --------------------- 4(6) -------->
d: ------------------------- 15(x2,5) -------- 12(x2,6) --->
E.g.: c = a + b (with a having [{:g, 0}, {:t, 0}], b having {:g, 0} and the resulting c having [{:g, 0}, {:t, 0}] for guarantee)
a: 5(x2,1) ---------------------- 7(x2,2) -------- 6(x2,3)->
b: ------- 3(x1) ---- 4(x2) --------------- 7(x3) --------->
c: ------------------- 9(x2,1) --- 11(x2,2) ------- 10(x2,3)
(with the resulting guarantees of d being: {:g, 0}, {:t, 0})
liftapp_var(ss, arg, func) View Source
Lifts and applies a function that operates on lists of variable size to a list of signals that can be subject to change.
Takes:
- A list of signals, each carrying the same guarantee g (can be plural).
- A higher order signal carrying new signals of guarantee g.
- A function operating on lists of any size.
E.g.: b = List.sum(as) / length(as) (with all a's having {:fu, _} for guarantee) and h a signal carrying new signals.
h : ----------------- a3 --------------------- a4 -------->
a1: 5 ------------------------------///////////////////////
a2: ------------- 3 ----------------------- 6 ------------>
a3: //////////////////------ 7 --------------------------->
a4: ////////////////////////////////////////////---- 2 --->
...
b : -------------- 4 -------- 5 ------------ 6 ------ 5 -->
merge(signals) View Source
merge(signals, new_cg) View Source
Merges multiple signals together such that the resulting signal carries the updates of all composed signals.
If no consistency guarantee is provided, the merge leaves the updates 'as is'. A necessary condition for this operation to be valid then is that the givven signals all carry the same guarantee. The consequences of using this operator in this way are left to the developer.
If however a consistency guarantee is provided, this new guarantee is attached to the resulting signal, discarding the previous ones. Thus, merging in this way can be considered to be the creation of a new source signal for this guarantee in a stratified dependency graph
register(signal, name) View Source
Publishes a signal by registering it in the registry.
remove_guarantee(arg1, cgt) View Source
Removes the given guarantee from the given signal. Leaves the signal alone if the guarantee is not present.
scan(arg, func, default \\ nil) View Source
Applies a given procedure to a signal's value and its previous result. Works in the same way as the Enum.scan function:
Enum.scan(1..10, fn(x,y) -> x + y end) => [1, 3, 6, 10, 15, 21, 28, 36, 45, 55]
set_guarantee(arg, cg) View Source
Sets the given consistency guarantee as the only guarantee for the given signal.
This can be considered the creation of a new source signal from another signal in a stratified dependency graph.
signal(name) View Source
Gets a signal from the registry by its name.
to_plain_obs(arg) View Source
Transforms a signal into a plain observable.
to_signal_obs(arg) View Source
Transforms a signal into a signal observable, meaning that both the value and context {v, c} of each observable message are preserved.
unregister(name) View Source
Unregisters a signal from the registry by its name.