Vessel v0.8.0 Vessel.Relay

This module acts as a relay IO stream to allow redirection of pairs written by Vessel. This is mainly provided for in-application MapReduce and for testing.

The concept behind this module is quite simply a buffer of the messages which have been received from the IO stream. There are a few utility functions which allow you to act on the buffer, or flush it entirely.

An example usage of this module is to start the Relay and pass the process id through to the stdout/stderr options inside the consume/1 callback of a Vessel mapper or reducer. The output messages will then be received by Relay and can be easily used to test your components from inside unit tests.

Summary

Functions

Creates a new Relay worker

Flushes the Relay buffer

Forwards the entire Relay buffer to a process

Retrieves the ordered Relay buffer

Retrieves the raw Relay buffer

Retrieves a sorted Relay buffer

Stops a Relay worker

Types

server()
server() :: GenServer.server

Functions

create(opts \\ [])

Creates a new Relay worker.

Workers are always linked to the current process, as they’re designed to be used from within ExUnit and be short lived (to avoid leaking).

flush(pid)
flush(server) :: :ok

Flushes the Relay buffer.

This simply throws away the current buffer stored in the Relay. No other actions will remove from the currently stored buffer, nor is there a way to modify the stored buffer.

forward(pid, ref \\ self())
forward(server, server) :: :ok

Forwards the entire Relay buffer to a process.

This is to aid in testing so you may simple use receive assertions. This will simply send a Tuple message per buffer element (in order of reception) of the form { :relay, msg }.

get(pid)
get(server) :: [binary]

Retrieves the ordered Relay buffer.

This pulls back the raw buffer from the Relay and reverses it, to ensure that messages are correctly ordered.

raw(pid)
raw(server) :: [binary]

Retrieves the raw Relay buffer.

This is return in a reversed order, if you want the correctly ordered buffer, please use Vessel.Relay.get/1.

sort(pid, comparator \\ &default_sort/2)
sort(server, (binary, binary -> integer)) :: [binary]

Retrieves a sorted Relay buffer.

The sort here has a default of a default Hadoop sort but can be overridden to sort based on custom options (for example if you tweak Hadoop’s sorting).

For now you have to write your own sorting if you’re doing a custom sort, as I have neither the time or effort to implement a GNU-like sort parser.

stop(pid)
stop(server) :: :ok

Stops a Relay worker.

This just terminates an existing Relay process using GenServer.stop/1. It’s just a shorthand to mask the implementation details from the user.