Runbox.Runtime.Stage.Timezip.MultiQueue (runbox v1.2.0)

Buffering multi-queue data structure for Timezip.

If the structure would be heap, the complexity would be Olog(N) where N is number of messages (topped by max demand). With multi-queue operation is ~ O(M) where M is number of producers.

Link to this section Summary

Functions

Adds new queue to MultiQueue.

Main logic, will return messages from all queues zipped/sorted by min timestamp.

Adds messages for queue id to multiqueue.

Removes queue and its content from multi_queue

Link to this section Types

Link to this type

demand_distribution()

@type demand_distribution() :: %{optional(term()) => pos_integer()}
@type t() :: %Runbox.Runtime.Stage.Timezip.MultiQueue{
  queues: %{optional(term()) => :queue.queue()}
}

Link to this section Functions

Link to this function

add_queue(multi_queue, queue_id)

Adds new queue to MultiQueue.

If adding same id again -> clears buffered messages for given producer.

Link to this function

dequeue_all(multi_queue, comparator \\ &default_ts_tuple_comparator/2)

@spec dequeue_all(t(), (msg, msg -> boolean())) ::
  {:ok, t(), demand_distribution(), emitted :: [msg]} | {:error, term()}
when msg: term()

Main logic, will return messages from all queues zipped/sorted by min timestamp.

Will return as many messages as posible, until some queue is empty.

In returned tuple, returned values means:

  • multi_queue: multi-queue without emitted messages

  • demand_distribution: map with id -> integer(), meaning how many messages in output originates from particular queue id, can be used for manual demand handling in genstage

  • emitted: returned messages from multiqueue front

Link to this function

enqueue(buffer, id, messages)

@spec enqueue(t(), term(), [term()]) :: {:ok, t()} | {:error, term()}

Adds messages for queue id to multiqueue.

Link to this function

remove_queue(multi_queue, queue_id)

Removes queue and its content from multi_queue