CSPEx v2.0.0-beta CSP.Channel View Source

Module used to create and manage channels.

Options

There are some options that may be used to change a channel behavior, but the channel’s options can only be set during it’s creation.

The available options are:

Using as a collection

The CSP.Channel struct has underpinnings for working alongside the Stream and Enum modules. You can use a channel directly as a Collectable:

channel = Channel.new()
Enum.into([:some, :data], channel)

channel = Channel.new()
for x <- 1..4, into: channel do
  x * x
end

And you can also use the channel as an Enumerable:

channel = Channel.new()
Enum.take(channel, 2)

channel = channel.new()
for x <- channel do
  x * x
end

Just be mindful that just like the put and get operations can be blocking, so do these. One trick is to spin up a process to take care of feeding and reading the channel:

channel = Channel.new()
spawn_link(fn -> Enum.into([:some, :data], channel) end)
Enum.take(channel, 2) # => [:some, :data]

In the next section we will discuss some of the gotchas of using channels as Collectables/Enumerables.

As a Collectable

Every element that is fed into the channel causes a putoperation, so be sure that there will be someone reading from your channel or that your channel has a buffer big enough to accommodate the incoming events.

As an Enumerable

Every element that is read from the channel causes a get operation. so be sure that there will be someone adding values to your channel.

A thing to keep in mind while using channels as Enumerables is that they act like infinite streams, so eager functions that consume the whole stream (like Enum.map/2 or Enum.each/2) will only return when the channel is closed (see CSP.Channel.close/1).

Unless that is exactly what you want, use functions from the Stream module to build your processing pipeline and finish them with something like Enum.take/2 or Enum.take_while/2.

Another caveat of using channels as Enumerables is that filtering with Stream.filter/2 or Enum.filter/2 does not simply filter what you are going to read from the channel. It reads the values and then discards the rejected ones.

If you just want to direct the filtered results elsewhere or partition the events between multiple consumers, use CSP.Channel.partition/2 and CSP.Channel.partition/3 respectively.

The almost same thing would happen with Enum.count/1 and Enum.member?/2, as the values would be read from the channel to get the result and be discarded afterwards. As an operation like this should not be done on a channel, an error is raised if those functions (and others similar to them) are called with a channel.

Using the the Collectable/Enumerable implementation you can get some nice results, like this parallel map over a channel implementation:

defmodule ChannelExtensions do
  alias CSP.Channel

  # Receive a channel, the number of workers
  # and a function to be called on each value.
  # Returns a channel with the results.
  def pmap(source, workers, fun) do
    # Create the results channel and a temporary channel just for cleanup.
    results = Channel.new()
    done = Channel.new()

    # Spin up the number of workers passed as argument.
    # Each worker stream values from on channel
    # to the other passing each to the function.
    # After the source is depleted, each worker puts
    # a message in the done channel.
    for _ <- 1..workers do
      Task.start_link(fn ->
        for value <- source, into: results, do: fun.(value)

        Channel.put(done, true)
      end)
    end

    # Spin up a cleanup process that blocks until all
    # the workers put a message in the done channel,
    # then closes both the done and the results channels.
    Task.start_link(fn ->
      Enum.take(done, workers)

      Channel.close(done)
      Channel.close(results)
    end)

    results
  end
end

OTP Compatibility

Since channels are just GenServers, you can use a channel in a supervision tree:

alias CSP.{
  Buffer,
  Channel
}

children = [
  {Channel, name: MyApp.Channel, buffer: Buffer.Blocking.new(10)}
]

{:ok, pid} = Supervisor.start_link(children, strategy: :one_for_one)

You can use all the functions with the registered name instead of the channel struct:

Channel.put(MyApp.Channel, :data)
Channel.put(MyApp.Channel, :other)

Channel.get(MyApp.Channel) #=> :data
Channel.get(MyApp.Channel) #=> :other

If you want to use it as a channel struct just call CSP.Channel.wrap/1:

channel = Channel.wrap(MyApp.Channel)
Enum.into(1..4, channel)

Link to this section Summary

Functions

Function responsible for closing a channel

Returns true if the channel is closed or false otherwise

Creates a channel based on the given enumerable

Creates a channel based on the given enumerables

Function responsible for fetching a value of the channel

Function responsible for creating a new channel

Partitions a channel in two, according to the provided function

Partitions events from one channel to an arbitrary number of other channels, according to the partitions definition and the hashing function

Function responsible for putting a value in the channel

Function responsible for the starting of the channel

Creates a channel that buffers events from a source channel

Wraps the PID or registered name in a Channel struct

Link to this section Types

Link to this type buffer_type() View Source
buffer_type() :: :blocking | :sliding | :dropping
Link to this type channel_ref() View Source
channel_ref() :: term() | t()
Link to this type option() View Source
option() ::
  {:buffer, CSP.Buffer.t()}
  | {:buffer_size, non_neg_integer()}
  | {:buffer_type, buffer_type()}
  | {:name, GenServer.name()}
Link to this type options() View Source
options() :: [option()]
Link to this type t() View Source
t() :: %CSP.Channel{ref: term()}

Link to this section Functions

Function responsible for closing a channel.

Example

iex> channel = Channel.new()
iex> Channel.closed?(channel)
false
iex> Channel.close(channel)
iex> Channel.closed?(channel)
true

Returns true if the channel is closed or false otherwise.

Link to this function from_enumerable(enum) View Source
from_enumerable(Enum.t()) :: t()

Creates a channel based on the given enumerable.

The created channel is closed when the enumerable is depleted.

Example

iex> channel = Channel.from_enumerable(1..4)
iex> Enum.to_list(channel)
[1, 2, 3, 4]
Link to this function from_enumerables(enums) View Source
from_enumerables([Enum.t()]) :: t()

Creates a channel based on the given enumerables.

The created channel is closed when the provided enumerables are depleted.

Example

iex> channel = Channel.from_enumerables([1..4, 5..8])
iex> Enum.to_list(channel)
[1, 2, 3, 4, 5, 6, 7, 8]

Function responsible for fetching a value of the channel.

It will block until a value is inserted in the channel or it is closed.

Always returns nil when the channel is closed.

Example

iex> channel = Channel.new()
iex> spawn_link(fn -> Channel.put(channel, :data) end)
iex> Channel.get(channel)
:data
iex> Channel.close(channel)
iex> Channel.get(channel)
nil
Link to this function new(options \\ []) View Source
new(options()) :: t()

Function responsible for creating a new channel.

Useful for using channels outside of a supervision tree.

Example

iex> channel = Channel.new()
iex> spawn_link(fn -> Channel.put(channel, :data) end)
iex> Channel.get(channel)
:data
Link to this function partition(source, fun) View Source
partition(t(), (term() -> boolean())) :: {t(), t()}

Partitions a channel in two, according to the provided function.

The created channels are closed when the source channel is closed.

Important

This function expects that you are going to simultaneously read from both channels, as an event that is stuck in one of them will block the other.

If you just want to discard values from a channel, use Stream.filter/2 or Stream.reject/2.

Example

iex> require Integer
iex> channel = Channel.from_enumerable(1..4)
iex> {even, odd} = Channel.partition(channel, &Integer.is_even/1)
iex> Channel.get(odd)
1
iex> Channel.get(even)
2
iex> Channel.get(odd)
3
iex> Channel.get(even)
4
Link to this function partition(source, partitions, hashing_fun) View Source
partition(t(), Enum.t(), (term() -> term())) :: %{optional(term()) => t()}

Partitions events from one channel to an arbitrary number of other channels, according to the partitions definition and the hashing function.

Returns a map with the partition as the name and the partition channel as the value.

All the created channels are closed when the source channel is closed.

Important

This function expects that you are going to simultaneously read from all channels, as an event that is stuck in one of them will block the others.

Example

iex> channel = Channel.from_enumerable(1..10)
iex> partitions = Channel.partition(channel, 0..3, &rem(&1, 4))
iex> partitions
...> |> Enum.map(fn {key, channel} -> {key, Channel.with_buffer(channel, 5)} end)
...> |> Enum.map(fn {key, channel} -> {key, Enum.to_list(channel)} end)
[{0, [4, 8]}, {1, [1, 5, 9]}, {2, [2, 6, 10]}, {3, [3, 7]}]
Link to this function put(channel, item) View Source
put(channel_ref(), term()) :: :ok

Function responsible for putting a value in the channel.

It may block until a value is fetched deppending on the buffer type of the channel.

Raises if trying to put nil or if trying to put anything in a closed channel.

Example

iex> channel = Channel.new(buffer: CSP.Buffer.Blocking.new(5))
iex> Channel.put(channel, :data)
iex> Channel.put(channel, :other)
iex> Enum.take(channel, 2)
[:data, :other]

Non-linking version of CSP.Channel.start_link/1

Link to this function start_link(options \\ []) View Source
start_link(options()) :: GenServer.on_start()

Function responsible for the starting of the channel.

Ideal for using a CSP in a supervision tree.

Link to this function with_buffer(source, size) View Source
with_buffer(t(), non_neg_integer()) :: t()

Creates a channel that buffers events from a source channel.

The created channel is closed when the source channel is closed.

Example

iex> unbuffered = Channel.new()
iex> buffered = Channel.with_buffer(unbuffered, 5)
iex> Enum.into(1..5, unbuffered)
iex> Enum.take(buffered, 5)
[1, 2, 3, 4, 5]

Wraps the PID or registered name in a Channel struct.

If the passed in value is already a Channel struct, return it unchanged.

Example

iex> {:ok, pid} = Channel.start_link(buffer: CSP.Buffer.Blocking.new(5))
iex> channel = Channel.wrap(pid)
iex> Enum.into(1..5, channel)
iex> Channel.close(channel)
iex> Enum.to_list(channel)
[1, 2, 3, 4, 5]

iex> channel = Channel.new()
iex> channel == Channel.wrap(channel)
true