View Source GenStage.PartitionDispatcher (gen_stage v1.2.1)

A dispatcher that sends events according to partitions.

This dispatcher assumes that partitions are evenly distributed. See the "Even distribution" section for more information.

When multiple consumers subscribe to one partition, the producer behaves like a GenStage.DemandDispatcher within that partition.

options

Options

The partition dispatcher accepts the following options on initialization:

  • :partitions - the number of partitions to dispatch to. It may be an integer with a total number of partitions, where each partition is named from 0 up to integer - 1. For example, partitions: 4 will contain four partitions named 0, 1, 2 and 3.

    It may also be an enumerable that specifies the name of each partition. For instance, partitions: [:odd, :even] will build two partitions, named :odd and :even.

  • :hash - the hashing algorithm. It's a function of type hash_function/0, which receives the event and returns a tuple with two elements: the event to be dispatched and the partition to dispatch it to. The function can also return :none, in which case the event is discarded. The partition must be one of the partitions specified in :partitions above. The default uses:

    fn event -> {event, :erlang.phash2(event, Enum.count(partitions))} end

examples

Examples

To start a producer with four partitions named 0, 1, 2, and 3:

{:producer, state, dispatcher: {GenStage.PartitionDispatcher, partitions: 0..3}}

To start a producer with two partitions named :odd and :even:

{:producer, state, dispatcher: {GenStage.PartitionDispatcher, partitions: [:odd, :even]}}

subscribe-options

Subscribe options

When subscribing to a GenStage with a partition dispatcher the following option is required:

  • :partition - the name of the partition. The partition must be one of the partitions specified in :partitions above.

examples-1

Examples

The partition function can be given either on init's subscribe_to:

{:consumer, :ok, subscribe_to: [{producer, partition: 0}]}

Or when calling sync_subscribe:

GenStage.sync_subscribe(consumer, to: producer, partition: 0)

even-distribution

Even distribution

This dispatcher assumes that partitions are evenly distributed. If the data is uneven for long periods of time, then you may buffer excessive data from busy partitions for long periods of time. This happens because the producer is unable to distinguish from which particular consumer/partition demand arrives.

Let's see an example. Imagine you have three consumers, each for one partition: A, B, and C.

Let's assume 60% of the data goes to A, 20% to B, and 20% to C. Let's also say that max_demand is 10 and min_demand is 5. When the consumers initially request data (10 events each), the producer receives a total demand of 30. A will receive 18 of those (60%), while B and C receive 6 each (20%). After processing 5 events (the min_demand), each consumer requests additional 5 events, for a total of 15 additional events. At this point, that will be 9 additional elements for A, and 3 additional elements for B and C. At the end of these two rounds, we will have:

A = 18 - 5 + 9 = 22 events
B = 6 - 5 + 3 = 4 events
C = 6 - 5 + 3 = 4 events

Furthermore, as B and C request more items, A will only go further behind. This behaviour is fine for spikes that should quickly resolve, but it can be problematic if the data is consistently uneven.

Link to this section Summary

Types

The type used for the function passed to the :hash option.

Link to this section Types

Link to this type

hash_function()

View Source (since 1.2.0)
@type hash_function() ::
  (event :: any() -> {event :: any(), partition :: any()} | :none)

The type used for the function passed to the :hash option.