CSPEx v2.0.0-beta CSP.Channel View Source
Module used to create and manage channels.
Options
There are some options that may be used to change a channel behavior, but the channel’s options can only be set during it’s creation.
The available options are:
name- Registers the channel proccess with a name. Note that the naming constraints are the same applied to aGenServer.buffer- A struct that implements theCSP.Bufferprotocol. This library ships with three implementations:CSP.Buffer.Blocking,CSP.Buffer.DroppingandCSP.Buffer.Sliding. Check out their documentation for more information on how each one works.
Using as a collection
The CSP.Channel struct has underpinnings for working alongside the
Stream and Enum modules. You can use a channel directly as a Collectable:
channel = Channel.new()
Enum.into([:some, :data], channel)
channel = Channel.new()
for x <- 1..4, into: channel do
x * x
end
And you can also use the channel as an Enumerable:
channel = Channel.new()
Enum.take(channel, 2)
channel = channel.new()
for x <- channel do
x * x
end
Just be mindful that just like the put and get operations can be blocking,
so do these. One trick is to spin up a process to take care of feeding and
reading the channel:
channel = Channel.new()
spawn_link(fn -> Enum.into([:some, :data], channel) end)
Enum.take(channel, 2) # => [:some, :data]
In the next section we will discuss some of the gotchas of using channels as Collectables/Enumerables.
As a Collectable
Every element that is fed into the channel causes a putoperation, so be sure
that there will be someone reading from your channel or that your channel has
a buffer big enough to accommodate the incoming events.
As an Enumerable
Every element that is read from the channel causes a get operation. so be sure
that there will be someone adding values to your channel.
A thing to keep in mind while using channels as Enumerables is that they act like
infinite streams, so eager functions that consume the whole stream
(like Enum.map/2 or Enum.each/2) will only return when the channel is closed
(see CSP.Channel.close/1).
Unless that is exactly what you want, use functions from the Stream module to
build your processing pipeline and finish them with something like Enum.take/2
or Enum.take_while/2.
Another caveat of using channels as Enumerables is that filtering with
Stream.filter/2 or Enum.filter/2 does not simply filter what you are going
to read from the channel. It reads the values and then discards the rejected
ones.
If you just want to direct the filtered results elsewhere or partition the
events between multiple consumers, use CSP.Channel.partition/2 and
CSP.Channel.partition/3 respectively.
The almost same thing would happen with Enum.count/1 and Enum.member?/2,
as the values would be read from the channel to get the result and be discarded
afterwards. As an operation like this should not be done on a channel, an error
is raised if those functions (and others similar to them) are called with a
channel.
Using the the Collectable/Enumerable implementation you can get some nice results, like this parallel map over a channel implementation:
defmodule ChannelExtensions do
alias CSP.Channel
# Receive a channel, the number of workers
# and a function to be called on each value.
# Returns a channel with the results.
def pmap(source, workers, fun) do
# Create the results channel and a temporary channel just for cleanup.
results = Channel.new()
done = Channel.new()
# Spin up the number of workers passed as argument.
# Each worker stream values from on channel
# to the other passing each to the function.
# After the source is depleted, each worker puts
# a message in the done channel.
for _ <- 1..workers do
Task.start_link(fn ->
for value <- source, into: results, do: fun.(value)
Channel.put(done, true)
end)
end
# Spin up a cleanup process that blocks until all
# the workers put a message in the done channel,
# then closes both the done and the results channels.
Task.start_link(fn ->
Enum.take(done, workers)
Channel.close(done)
Channel.close(results)
end)
results
end
end
OTP Compatibility
Since channels are just GenServers, you can use a channel in a supervision tree:
alias CSP.{
Buffer,
Channel
}
children = [
{Channel, name: MyApp.Channel, buffer: Buffer.Blocking.new(10)}
]
{:ok, pid} = Supervisor.start_link(children, strategy: :one_for_one)
You can use all the functions with the registered name instead of the channel struct:
Channel.put(MyApp.Channel, :data)
Channel.put(MyApp.Channel, :other)
Channel.get(MyApp.Channel) #=> :data
Channel.get(MyApp.Channel) #=> :other
If you want to use it as a channel struct just call CSP.Channel.wrap/1:
channel = Channel.wrap(MyApp.Channel)
Enum.into(1..4, channel)
Link to this section Summary
Functions
Function responsible for closing a channel
Returns true if the channel is closed or false otherwise
Creates a channel based on the given enumerable
Creates a channel based on the given enumerables
Function responsible for fetching a value of the channel
Function responsible for creating a new channel
Partitions a channel in two, according to the provided function
Partitions events from one channel to an arbitrary number of other channels, according to the partitions definition and the hashing function
Function responsible for putting a value in the channel
Non-linking version of CSP.Channel.start_link/1
Function responsible for the starting of the channel
Creates a channel that buffers events from a source channel
Wraps the PID or registered name in a Channel struct
Link to this section Types
option() ::
{:buffer, CSP.Buffer.t()}
| {:buffer_size, non_neg_integer()}
| {:buffer_type, buffer_type()}
| {:name, GenServer.name()}
Link to this section Functions
Function responsible for closing a channel.
Example
iex> channel = Channel.new()
iex> Channel.closed?(channel)
false
iex> Channel.close(channel)
iex> Channel.closed?(channel)
true
Returns true if the channel is closed or false otherwise.
Creates a channel based on the given enumerable.
The created channel is closed when the enumerable is depleted.
Example
iex> channel = Channel.from_enumerable(1..4)
iex> Enum.to_list(channel)
[1, 2, 3, 4]
Creates a channel based on the given enumerables.
The created channel is closed when the provided enumerables are depleted.
Example
iex> channel = Channel.from_enumerables([1..4, 5..8])
iex> Enum.to_list(channel)
[1, 2, 3, 4, 5, 6, 7, 8]
Function responsible for fetching a value of the channel.
It will block until a value is inserted in the channel or it is closed.
Always returns nil when the channel is closed.
Example
iex> channel = Channel.new()
iex> spawn_link(fn -> Channel.put(channel, :data) end)
iex> Channel.get(channel)
:data
iex> Channel.close(channel)
iex> Channel.get(channel)
nil
Function responsible for creating a new channel.
Useful for using channels outside of a supervision tree.
Example
iex> channel = Channel.new()
iex> spawn_link(fn -> Channel.put(channel, :data) end)
iex> Channel.get(channel)
:data
Partitions a channel in two, according to the provided function.
The created channels are closed when the source channel is closed.
Important
This function expects that you are going to simultaneously read from both channels, as an event that is stuck in one of them will block the other.
If you just want to discard values from a channel, use Stream.filter/2 or
Stream.reject/2.
Example
iex> require Integer
iex> channel = Channel.from_enumerable(1..4)
iex> {even, odd} = Channel.partition(channel, &Integer.is_even/1)
iex> Channel.get(odd)
1
iex> Channel.get(even)
2
iex> Channel.get(odd)
3
iex> Channel.get(even)
4
Partitions events from one channel to an arbitrary number of other channels, according to the partitions definition and the hashing function.
Returns a map with the partition as the name and the partition channel as the value.
All the created channels are closed when the source channel is closed.
Important
This function expects that you are going to simultaneously read from all channels, as an event that is stuck in one of them will block the others.
Example
iex> channel = Channel.from_enumerable(1..10)
iex> partitions = Channel.partition(channel, 0..3, &rem(&1, 4))
iex> partitions
...> |> Enum.map(fn {key, channel} -> {key, Channel.with_buffer(channel, 5)} end)
...> |> Enum.map(fn {key, channel} -> {key, Enum.to_list(channel)} end)
[{0, [4, 8]}, {1, [1, 5, 9]}, {2, [2, 6, 10]}, {3, [3, 7]}]
Function responsible for putting a value in the channel.
It may block until a value is fetched deppending on the buffer type of the channel.
Raises if trying to put nil or if trying to put anything in a closed channel.
Example
iex> channel = Channel.new(buffer: CSP.Buffer.Blocking.new(5))
iex> Channel.put(channel, :data)
iex> Channel.put(channel, :other)
iex> Enum.take(channel, 2)
[:data, :other]
Non-linking version of CSP.Channel.start_link/1
start_link(options()) :: GenServer.on_start()
Function responsible for the starting of the channel.
Ideal for using a CSP in a supervision tree.
with_buffer(t(), non_neg_integer()) :: t()
Creates a channel that buffers events from a source channel.
The created channel is closed when the source channel is closed.
Example
iex> unbuffered = Channel.new()
iex> buffered = Channel.with_buffer(unbuffered, 5)
iex> Enum.into(1..5, unbuffered)
iex> Enum.take(buffered, 5)
[1, 2, 3, 4, 5]
Wraps the PID or registered name in a Channel struct.
If the passed in value is already a Channel struct, return it unchanged.
Example
iex> {:ok, pid} = Channel.start_link(buffer: CSP.Buffer.Blocking.new(5))
iex> channel = Channel.wrap(pid)
iex> Enum.into(1..5, channel)
iex> Channel.close(channel)
iex> Enum.to_list(channel)
[1, 2, 3, 4, 5]
iex> channel = Channel.new()
iex> channel == Channel.wrap(channel)
true