gen_stage v1.0.0 GenStage.PartitionDispatcher View Source

A dispatcher that sends events according to partitions.

Keep in mind that, if partitions are not evenly distributed, a backed-up partition will slow all other ones.

Options

The partition dispatcher accepts the following options on initialization:

  • :partitions - the number of partitions to dispatch to. It may be an integer with a total number of partitions, where each partition is named from 0 up to integer - 1. For example, partitions: 4 will contain 4 partitions named 0, 1, 2 and 3.

    It may also be an enumerable that specifies the name of every partition. For instance, partitions: [:odd, :even] will build two partitions, named :odd and :even.

  • :hash - the hashing algorithm, which receives the event and returns a tuple with two elements, the event to be dispatched as first argument and the partition as second. The partition must be one of the partitions specified in :partitions above. The default uses fn event -> {event, :erlang.phash2(event, Enum.count(partitions))} end on the event to select the partition. If it returns :none, the event is discarded.

Examples

To start a producer with four partitions named 0, 1, 2 and 3:

{:producer, state, dispatcher: {GenStage.PartitionDispatcher, partitions: 0..3}}

To start a producer with two partitions named :odd and :even:

{:producer, state, dispatcher: {GenStage.PartitionDispatcher, partitions: [:odd, :even]}}

Subscribe options

When subscribing to a GenStage with a partition dispatcher the following option is required:

  • :partition - the name of the partition. The partition must be one of the partitions specified in :partitions above.

Examples

The partition function can be given either on init's subscribe_to:

{:consumer, :ok, subscribe_to: [{producer, partition: 0}]}

Or when calling sync_subscribe:

GenStage.sync_subscribe(consumer, to: producer, partition: 0)