honeydew v1.4.0 Honeydew View Source
A pluggable job queue + worker pool for Elixir.
Link to this section Summary
Types
Result of a Honeydew.Job
Functions
Runs a task asynchronously
Cancels a job
Cancels the job associated with the first argument
Filters the jobs currently on the queue
Moves a job to another queue
Returns a list of queues running on this node
Re-initializes the given worker, this is intended to be used from
within a worker’s c:Honeydew.Worker.failed_init/0
callback. Using it otherwise
may cause undefined behavior, at present, don’t do it
Resumes job processing for a queue
Starts a queue under Honeydew’s supervision tree
Starts workers under Honeydew’s supervision tree
Stops the local instance of the provided queue name
Stops the local workers for the provided queue name
Suspends job processing for a queue
Returns a list of queues that have workers are running on this node
Wait for a job to complete and return result
Link to this section Types
async_opt() :: {:reply, true} | {:delay_secs, pos_integer()}
queue_spec_opt() :: {:queue, mod_or_mod_args()} | {:dispatcher, mod_or_mod_args()} | {:failure_mode, mod_or_mod_args() | nil} | {:success_mode, mod_or_mod_args() | nil} | {:supervisor_opts, supervisor_opts()} | {:suspended, boolean()}
Result of a Honeydew.Job
worker_opt() :: {:num, non_neg_integer()} | {:init_retry_secs, pos_integer()} | {:shutdown, non_neg_integer()} | {:nodes, [node()]}
Link to this section Functions
async(task(), queue_name(), [async_opt()]) :: Honeydew.Job.t() | no_return()
Runs a task asynchronously.
Raises a RuntimeError
if queue
process is not available.
You can provide any of the following opts
:
reply
: returns the result of the job viayield/1
, see below.delay_secs
: delays the execution of the job by the provided number of seconds.
Examples
To run a task asynchronously:
Honeydew.async({:ping, ["127.0.0.1"]}, :my_queue)
To run a task asynchronously and wait for result:
# Without pipes
job = Honeydew.async({:ping, ["127.0.0.1"]}, :my_queue, reply: true)
Honeydew.yield(job)
# With pipes
:pong =
{:ping, ["127.0.0.1"]}
|> Honeydew.async(:my_queue, reply: true)
|> Honeydew.yield()
To run a task an hour later:
Honeydew.async({:ping, ["127.0.0.1"]}, :my_queue, delay_secs: 60*60)
cancel(Honeydew.Job.t()) :: :ok | {:error, :in_progress} | {:error, :not_found}
Cancels a job.
The return value depends on the status of the job.
:ok
- Job had not been started and was able to be cancelled.{:error, :in_progress}
- Job was in progress and unable to be cancelled.{:error, :not_found}
- Job was not found on the queue (or already processed) and was unable to be cancelled.
cancel(Honeydew.Job.private(), queue_name()) :: :ok | {:error, :in_progress} | {:error, :not_found}
Cancels the job associated with the first argument.
For example, for the Ecto Poll Queue, the first argument is the value of an ID from your schema.
The return value depends on the status of the job.
:ok
- Job had not been started and was able to be cancelled.{:error, :in_progress}
- Job was in progress and unable to be cancelled, the Ecto Poll Queue does not support this return.{:error, :not_found}
- Job was not found on the queue (or already processed) and was unable to be cancelled.
filter(queue_name(), filter()) :: [Honeydew.Job.t()]
Filters the jobs currently on the queue.
Filtration support depends on the queue implementation.
ErlangQueue and Mnesia queues support filtering with functions.
Ecto Poll Queues have pre-defined, named filters. At the moment, only :abandoned
is implemented.
Note:
- This function returns a
List
, not aStream
, so calling it can be memory intensive when invoked on a large queue. - The filtration is done by the queue process, not the client, so a heavy filter will tie up the queue.
Examples
Filter jobs with a specific task.
Honeydew.filter(:my_queue, &match?(%Honeydew.Job{task: {:ping, _}}, &1)) # ErlangQueue or Mnesia
Honeydew.filter(:my_queue, %{task: {:ping, ["127.0.0.1"]}}) # Mnesia
Honeydew.filter(:my_queue, :abandoned) # Ecto queue
Return all jobs.
Honeydew.filter(:my_queue, fn _ -> true end)
move(Honeydew.Job.t(), to_queue :: queue_name()) :: Honeydew.Job.t() | no_return()
Moves a job to another queue.
Raises a RuntimeError
if to_queue
is not available.
This function first enqueues the job on to_queue
, and then tries to
cancel it on its current queue. This means there’s a possiblity a job could
be processed on both queues. This behavior is consistent with Honeydew’s
at-least-once execution goal.
This function is most helpful on a queue where there a no workers (like a dead letter queue), because the job won’t be processed out from under the queue.
Returns a list of queues running on this node.
Re-initializes the given worker, this is intended to be used from
within a worker’s c:Honeydew.Worker.failed_init/0
callback. Using it otherwise
may cause undefined behavior, at present, don’t do it.
Resumes job processing for a queue.
start_queue(queue_name(), [queue_spec_opt()]) :: :ok
Starts a queue under Honeydew’s supervision tree.
name
is how you’ll refer to the queue to add a task.
You can provide any of the following opts
:
queue
: is the module that queue will use. Defaults toHoneydew.Queue.ErlangQueue
. You may also provide args to the queue’sc:Honeydew.Queue.init/2
callback using the following format:{module, args}
.dispatcher
: the job dispatching strategy,{module, init_args}
.failure_mode
: the way that failed jobs should be handled. You can pass either a module, or{module, args}
. The module must implement theHoneydew.FailureMode
behaviour. Defaults to{Honeydew.FailureMode.Abandon, []}
.success_mode
: a callback that runs when a job successfully completes. You can pass either a module, or{module, args}
. The module must implement theHoneydew.SuccessMode
behaviour. Defaults tonil
.suspended
: Start queue in suspended state. Defaults tofalse
.
For example:
Honeydew.start_queue("my_awesome_queue")
Honeydew.start_queue("my_awesome_queue", queue: {MyQueueModule, [ip: "localhost"]}, dispatcher: {Honeydew.Dispatcher.MRU, []})
Starts workers under Honeydew’s supervision tree.
name
is the name of the queue that the workers pull jobs from.
module
is the module that the workers in your queue will use. You may also
provide Honeydew.Worker.init/1
args with {module, args}
.
You can provide any of the following opts
:
num
: the number of workers to start. Defaults to10
.init_retry_secs
: the amount of time, in seconds, a stateful worker waits before trying to re-initialize after itsHoneydew.Worker.init/1
function fails. You can also override this behavior by implementing theHoneydew.Worker.init_failed/0
callback, seeREADME/workers.md
.shutdown
: if a worker is in the middle of a job, the amount of time, in milliseconds, to wait before brutally killing it. Defaults to10_000
.nodes
: for :global queues, you can provide a list of nodes to stay connected to (your queue node and enqueuing nodes). Defaults to[]
.
For example:
Honeydew.start_workers("my_awesome_queue", MyJobModule)
Honeydew.start_workers("my_awesome_queue", {MyJobModule, [key: "secret key"]}, num: 3)
Honeydew.start_workers({:global, "my_awesome_queue"}, MyJobModule, nodes: [:clientfacing@dax, :queue@dax])
stop_queue(queue_name()) :: :ok | {:error, :not_running}
Stops the local instance of the provided queue name.
stop_workers(queue_name()) :: :ok | {:error, :not_running}
Stops the local workers for the provided queue name.
Suspends job processing for a queue.
Returns a list of queues that have workers are running on this node.
yield(Honeydew.Job.t(), timeout()) :: {:ok, result()} | nil | no_return()
Wait for a job to complete and return result.
Returns the result of a job, or nil
on timeout. Raises an ArgumentError
if
the job was not created with reply: true
and in the current process.
Example
Calling yield/2
with different timeouts.
iex> job = Honeydew.async({:ping, ["127.0.0.1"]}, :my_queue, reply: true)
iex> Honeydew.yield(job, 500) # Wait half a second
nil
# Result comes in at 1 second
iex> Honeydew.yield(job, 1000) # Wait up to a second
{:ok, :pong}
iex> Honeydew.yield(job, 0)
nil # <- because the message has already arrived and been handled
The only time yield/2
would ever return the result more than once is if
the job executes more than once (as Honeydew aims for at-least-once
execution).