View Source Runbox.Runtime.Timezip.MultiQueue (runbox v21.2.0)

Buffering multi-queue data structure for Timezip.

If the structure would be heap, the complexity would be Olog(N) where N is number of messages (topped by max demand). With multi-queue operation is ~ O(M) where M is number of producers.

The produces / queues are ordered. Messages in the same timestamp across different queues are produced in this order.

Summary

Functions

Adds new queue to MultiQueue.

Main logic, will return messages from all queues zipped/sorted by min timestamp.

Adds messages for queue id to multiqueue.

Removes queue and its content from multi_queue

Types

@type demand_distribution() :: %{optional(queue_id()) => pos_integer()}
@type queue_id() :: any()
@type t() :: %Runbox.Runtime.Timezip.MultiQueue{
  queues: [{queue_id(), :queue.queue()}]
}

Functions

Link to this function

add_queue(multi_queue, queue_id)

View Source
@spec add_queue(t(), queue_id()) :: t()

Adds new queue to MultiQueue.

Adds a new empty queue at the end.

Link to this function

dequeue_all(multi_queue, comparator \\ &default_ts_tuple_comparator/2)

View Source
@spec dequeue_all(t(), comparator :: (msg, msg -> boolean())) ::
  {:ok, t(), demand_distribution(), emitted :: [msg]} | {:error, term()}
when msg: term()

Main logic, will return messages from all queues zipped/sorted by min timestamp.

Will return as many messages as possible, until some queue is empty.

In returned tuple, returned values means:

  • multi_queue: multi-queue without emitted messages

  • demand_distribution: map with id -> integer(), meaning how many messages in output originates from particular queue id, can be used for manual demand handling in genstage

  • emitted: returned messages from multiqueue front

Link to this function

enqueue(buffer, id, messages)

View Source
@spec enqueue(t(), queue_id(), [message :: term()]) :: {:ok, t()} | {:error, term()}

Adds messages for queue id to multiqueue.

Link to this function

remove_queue(multi_queue, queue_id)

View Source
@spec remove_queue(t(), queue_id()) :: t()

Removes queue and its content from multi_queue