Module emmap_queue

The FIFO queue can be used as a persistent container of messages with constant time push and pop operations.

Authors: Serge Aleynikov.

Description

Persistent FIFO queue The FIFO queue can be used as a persistent container of messages with constant time push and pop operations. Additionally, this module provides a gen_server API, which wraps the queue for use in multi-process applications. The queue is stored in a memory-mapped file, and it automatically grows if the messages are not dequeued from the queue. Messages stored in the queue can be compressed using variable compression level controlled by the argument to the push/3 function. See test cases at the end of this module for sample use cases.

Data Types

queue()

queue() = emmap:mmap_file()

Function Index

close/1Close a previously open queue.
dequeue/1Dequeue a message from the queue.
enqueue/2Enqueue a message to the queue.
erase/1Erase the content of the queue.
flush/1Asynchronously flush the modified memory used by the queue to disk.
handle_call/3
handle_cast/2
handle_info/2
info/1Return queue info as a map.
init/1
inspect/1Peek a message from the queue without dequeuing it.
is_empty/1Returns true if the queue is empty.
length/1Get queue length.
metadata/1Get queue metadata.
open/3Open a memory mapped queue.
peek/3Inspect all messages in the queue iteratively by passing them to a custom lambda function.
peek_back/1Peek the last written term at the back of the FIFO queue without removing it.
peek_front/1Peek the next awaiting term from the head of the FIFO queue without popping it.
pop/1Pop a term from the queue.
pop/3Pop messages from the queue by passing them to a custom lambda function.
pop_and_purge/1Pop a term from the queue and reclaim queue's memory if the queue is empty.
purge/1Purge queue.
push/2Push a term to the queue.
push/3Push a term to the queue.
rpeek/3Inspect all messages in the queue iteratively in the reverse order by passing them to a custom lambda function.
start_link/4Start a queue process.
terminate/2
try_dequeue/2Dequeue a message from the queue.
try_pop/2Evaluate the Fun function on the next term in the queue.
try_pop_and_purge/2Evaluate the Fun function on the next term in the queue.

Function Details

close/1

close(Mem::queue()) -> ok

Close a previously open queue.

dequeue/1

dequeue(Name) -> any()

Dequeue a message from the queue

enqueue/2

enqueue(Name, Term) -> any()

Enqueue a message to the queue

erase/1

erase(Mem) -> any()

Erase the content of the queue. All messages in the queue are discarded!

flush/1

flush(Mem::queue()) -> ok

Asynchronously flush the modified memory used by the queue to disk. See notes of emmap:sync/1. This call is optional.

handle_call/3

handle_call(X1, From, State) -> any()

handle_cast/2

handle_cast(Msg, State) -> any()

handle_info/2

handle_info(X1, State) -> any()

info/1

info(Name::atom()) -> map()

Return queue info as a map

init/1

init(X1) -> any()

inspect/1

inspect(Name) -> any()

Peek a message from the queue without dequeuing it

is_empty/1

is_empty(Mem) -> any()

Returns true if the queue is empty.

length/1

length(Mem) -> any()

Get queue length. This function has a linear-time complexity.

metadata/1

metadata(Mem::queue()) -> #{head => integer(), tail => integer(), next_tail => integer(), size => integer()}

Get queue metadata

open/3

open(Filename::binary() | list(), Size::integer(), Opts::list()) -> {ok, queue()} | {error, term()}

Open a memory mapped queue.

peek/3

peek(Mem, Init, Fun) -> any()

Inspect all messages in the queue iteratively by passing them to a custom lambda function. The Fun takes a message and state and returns a {cont, State} to continue or {halt, State} to abort.

peek_back/1

peek_back(Mem) -> any()

Peek the last written term at the back of the FIFO queue without removing it. This function has a constant-time complexity.

peek_front/1

peek_front(Mem) -> any()

Peek the next awaiting term from the head of the FIFO queue without popping it. This function has a constant-time complexity.

pop/1

pop(Mem) -> any()

Pop a term from the queue. This function has a constant-time complexity.

pop/3

pop(Mem, Init, Fun) -> any()

Pop messages from the queue by passing them to a custom lambda function. The Fun takes a message and state and returns a {cont, State} to continue or {halt, State} to abort.

pop_and_purge/1

pop_and_purge(Mem) -> any()

Pop a term from the queue and reclaim queue's memory if the queue is empty. This function has a constant-time complexity.

purge/1

purge(Mem::queue()) -> boolean()

Purge queue. It is a constant time operation.

push/2

push(Mem, Term) -> any()

Push a term to the queue. This function has a constant-time complexity.

push/3

push(Mem, Term, Compression) -> any()

Push a term to the queue. This function has a constant-time complexity. Compression is the compression level from 0 to 9, where 0 is no compression.

rpeek/3

rpeek(Mem, Init, Fun) -> any()

Inspect all messages in the queue iteratively in the reverse order by passing them to a custom lambda function. The Fun takes a message and state and returns a {cont, State} to continue or {halt, State} to abort.

start_link/4

start_link(Name, Filename, SegmSize, Opts) -> any()

Start a queue process. Name - name of the registered process. Filename - name of the memory mapped file. SegmSize - size of the memory mapped segment (can be 0 for an existing file). Opts - see emmap:open_options().

terminate/2

terminate(Reason, State) -> any()

try_dequeue/2

try_dequeue(Name, Fun) -> any()

Dequeue a message from the queue. The function returns Res where Res is the result of evaluating the Fun with the poped element from the queue. If the Fun throws an exception, the exception if propagated to the caller.

try_pop/2

try_pop(Mem, Fun) -> any()

Evaluate the Fun function on the next term in the queue. If the function doesn't raise an exception, pop the term from the queue, otherwise leave the queue unmodified. This function has a constant-time complexity. It returns the result of evaluating Fun.

try_pop_and_purge/2

try_pop_and_purge(Mem, Fun) -> any()

Evaluate the Fun function on the next term in the queue. If the function doesn't raise an exception, pop the term from the queue, otherwise leave the queue unmodified. This function has a constant-time complexity. It returns the result of evaluating Fun.