View Source Runbox.Runtime.Timezip.MultiQueue (runbox v21.2.0)
Buffering multi-queue data structure for Timezip.
If the structure would be heap, the complexity would be Olog(N) where N is number of messages (topped by max demand). With multi-queue operation is ~ O(M) where M is number of producers.
The produces / queues are ordered. Messages in the same timestamp across different queues are produced in this order.
Summary
Functions
Adds new queue to MultiQueue.
Main logic, will return messages from all queues zipped/sorted by min timestamp.
Adds messages for queue id to multiqueue.
Removes queue and its content from multi_queue
Types
@type demand_distribution() :: %{optional(queue_id()) => pos_integer()}
@type queue_id() :: any()
@type t() :: %Runbox.Runtime.Timezip.MultiQueue{ queues: [{queue_id(), :queue.queue()}] }
Functions
Adds new queue to MultiQueue.
Adds a new empty queue at the end.
dequeue_all(multi_queue, comparator \\ &default_ts_tuple_comparator/2)
View Source@spec dequeue_all(t(), comparator :: (msg, msg -> boolean())) :: {:ok, t(), demand_distribution(), emitted :: [msg]} | {:error, term()} when msg: term()
Main logic, will return messages from all queues zipped/sorted by min timestamp.
Will return as many messages as possible, until some queue is empty.
In returned tuple, returned values means:
multi_queue: multi-queue without emitted messages
demand_distribution: map with
id -> integer()
, meaning how many messages in output originates from particular queue id, can be used for manual demand handling in genstageemitted: returned messages from multiqueue front
Adds messages for queue id to multiqueue.
Removes queue and its content from multi_queue