Runbox.Runtime.Stage.Timezip.MultiQueue (runbox v6.0.0)
Buffering multi-queue data structure for Timezip.
If the structure would be heap, the complexity would be Olog(N) where N is number of messages (topped by max demand). With multi-queue operation is ~ O(M) where M is number of producers.
Link to this section Summary
Functions
Adds new queue to MultiQueue.
Main logic, will return messages from all queues zipped/sorted by min timestamp.
Adds messages for queue id to multiqueue.
Removes queue and its content from multi_queue
Link to this section Types
demand_distribution()
@type demand_distribution() :: %{optional(term()) => pos_integer()}
@type t() :: %Runbox.Runtime.Stage.Timezip.MultiQueue{ queues: %{optional(term()) => :queue.queue()} }
Link to this section Functions
add_queue(multi_queue, queue_id)
Adds new queue to MultiQueue.
If adding same id again -> clears buffered messages for given producer.
dequeue_all(multi_queue, comparator \\ &default_ts_tuple_comparator/2)
@spec dequeue_all(t(), (msg, msg -> boolean())) :: {:ok, t(), demand_distribution(), emitted :: [msg]} | {:error, term()} when msg: term()
Main logic, will return messages from all queues zipped/sorted by min timestamp.
Will return as many messages as posible, until some queue is empty.
In returned tuple, returned values means:
multi_queue: multi-queue without emitted messages
demand_distribution: map with
id -> integer()
, meaning how many messages in output originates from particular queue id, can be used for manual demand handling in genstageemitted: returned messages from multiqueue front
enqueue(buffer, id, messages)
Adds messages for queue id to multiqueue.
new()
remove_queue(multi_queue, queue_id)
Removes queue and its content from multi_queue