Honeydew (honeydew v1.5.0) View Source

A pluggable job queue + worker pool for Elixir.

Link to this section Summary

Functions

Runs a task asynchronously.

Cancels a job.

Cancels the job associated with the first argument.

Filters the jobs currently on the queue.

Moves a job to another queue.

Returns a list of queues running on this node.

Re-initializes the given worker, this is intended to be used from within a worker's Honeydew.Worker.init_failed/0 callback. Using it otherwise may cause undefined behavior, at present, don't do it.

Resumes job processing for a queue.

Starts a queue under Honeydew's supervision tree.

Starts workers under Honeydew's supervision tree.

Returns the currrent status of the queue and all attached workers.

Stops the local instance of the provided queue name.

Stops the local workers for the provided queue name.

Suspends job processing for a queue.

Returns a list of queues that have workers are running on this node.

Wait for a job to complete and return result.

Link to this section Types

Specs

async_opt() :: {:reply, true} | {:delay_secs, pos_integer()}

Specs

filter() :: (Honeydew.Job.t() -> boolean()) | atom()

Specs

mod_or_mod_args() :: module() | {module(), args :: term()}

Specs

queue_name() :: String.t() | atom() | {:global, String.t() | atom()}

Specs

queue_spec_opt() ::
  {:queue, mod_or_mod_args()}
  | {:dispatcher, mod_or_mod_args()}
  | {:failure_mode, mod_or_mod_args() | nil}
  | {:success_mode, mod_or_mod_args() | nil}
  | {:supervisor_opts, supervisor_opts()}
  | {:suspended, boolean()}

Specs

result() :: term()

Result of a Honeydew.Job

Specs

status_opt() :: {:timeout, pos_integer()}

Specs

supervisor_opts() :: Keyword.t()

Specs

task() :: {atom(), [arg :: term()]}

Specs

worker_opt() ::
  {:num, non_neg_integer()}
  | {:init_retry_secs, pos_integer()}
  | {:shutdown, non_neg_integer()}
  | {:nodes, [node()]}

Specs

worker_opts() :: [worker_opt()]

Link to this section Functions

Link to this function

async(task, queue, opts \\ [])

View Source

Specs

async(task(), queue_name(), [async_opt()]) :: Honeydew.Job.t() | no_return()

Runs a task asynchronously.

Raises a RuntimeError if queue process is not available.

You can provide any of the following opts:

  • reply: returns the result of the job via yield/1, see below.
  • delay_secs: delays the execution of the job by the provided number of seconds.

Examples

To run a task asynchronously:

Honeydew.async({:ping, ["127.0.0.1"]}, :my_queue)

To run a task asynchronously and wait for result:

# Without pipes
job = Honeydew.async({:ping, ["127.0.0.1"]}, :my_queue, reply: true)
Honeydew.yield(job)

# With pipes
:pong =
  {:ping, ["127.0.0.1"]}
  |> Honeydew.async(:my_queue, reply: true)
  |> Honeydew.yield()

To run a task an hour later:

Honeydew.async({:ping, ["127.0.0.1"]}, :my_queue, delay_secs: 60*60)

Specs

cancel(Honeydew.Job.t()) :: :ok | {:error, :in_progress} | {:error, :not_found}

Cancels a job.

The return value depends on the status of the job.

  • :ok - Job had not been started and was able to be cancelled.
  • {:error, :in_progress} - Job was in progress and unable to be cancelled.
  • {:error, :not_found} - Job was not found on the queue (or already processed) and was unable to be cancelled.

Specs

cancel(Honeydew.Job.private(), queue_name()) ::
  :ok | {:error, :in_progress} | {:error, :not_found}

Cancels the job associated with the first argument.

For example, for the Ecto Poll Queue, the first argument is the value of an ID from your schema.

The return value depends on the status of the job.

  • :ok - Job had not been started and was able to be cancelled.
  • {:error, :in_progress} - Job was in progress and unable to be cancelled, the Ecto Poll Queue does not support this return.
  • {:error, :not_found} - Job was not found on the queue (or already processed) and was unable to be cancelled.

Specs

filter(queue_name(), filter()) :: [Honeydew.Job.t()]

Filters the jobs currently on the queue.

Filtration support depends on the queue implementation.

ErlangQueue and Mnesia queues support filtering with functions.

Ecto Poll Queues have pre-defined, named filters. At the moment, only :abandoned is implemented.

Note:

  • This function returns a List, not a Stream, so calling it can be memory intensive when invoked on a large queue.
  • The filtration is done by the queue process, not the client, so a heavy filter will tie up the queue.

Examples

Filter jobs with a specific task.

Honeydew.filter(:my_queue, &match?(%Honeydew.Job{task: {:ping, _}}, &1)) # ErlangQueue or Mnesia

Honeydew.filter(:my_queue, %{task: {:ping, ["127.0.0.1"]}}) # Mnesia

Honeydew.filter(:my_queue, :abandoned) # Ecto queue

Return all jobs.

Honeydew.filter(:my_queue, fn _ -> true end)

Specs

move(Honeydew.Job.t(), to_queue :: queue_name()) ::
  Honeydew.Job.t() | no_return()

Moves a job to another queue.

Raises a RuntimeError if to_queue is not available.

This function first enqueues the job on to_queue, and then tries to cancel it on its current queue. This means there's a possiblity a job could be processed on both queues. This behavior is consistent with Honeydew's at-least-once execution goal.

This function is most helpful on a queue where there a no workers (like a dead letter queue), because the job won't be processed out from under the queue.

This function is deprecated. Honeydew now supervises your queue processes, please use `Honeydew.start_queue/2 instead.`.

Specs

queues() :: [queue_name()]

Returns a list of queues running on this node.

Specs

reinitialize_worker() :: :ok

Re-initializes the given worker, this is intended to be used from within a worker's Honeydew.Worker.init_failed/0 callback. Using it otherwise may cause undefined behavior, at present, don't do it.

Specs

resume(queue_name()) :: :ok

Resumes job processing for a queue.

Link to this function

start_queue(name, opts \\ [])

View Source

Specs

start_queue(queue_name(), [queue_spec_opt()]) :: :ok | {:error, term()}

Starts a queue under Honeydew's supervision tree.

name is how you'll refer to the queue to add a task.

You can provide any of the following opts:

  • queue: is the module that queue will use. Defaults to Honeydew.Queue.Mnesia. You may also provide args to the queue's Honeydew.Queue.init/2 callback using the following format: {module, args}.

  • dispatcher: the job dispatching strategy, {module, init_args}.

  • failure_mode: the way that failed jobs should be handled. You can pass either a module, or {module, args}. The module must implement the Honeydew.FailureMode behaviour. Defaults to {Honeydew.FailureMode.Abandon, []}.

  • success_mode: a callback that runs when a job successfully completes. You can pass either a module, or {module, args}. The module must implement the Honeydew.SuccessMode behaviour. Defaults to nil.

  • suspended: Start queue in suspended state. Defaults to false.

For example:

  • Honeydew.start_queue("my_awesome_queue")

  • Honeydew.start_queue("my_awesome_queue", queue: {MyQueueModule, [ip: "localhost"]}, dispatcher: {Honeydew.Dispatcher.MRU, []})

Note that the failure_mode or success_mode handler is run in the job's dedicated monitor process. This means the handlers for multiple jobs can run concurrently, but they can also crash that process.

Link to this function

start_workers(name, module_and_args, opts \\ [])

View Source

Starts workers under Honeydew's supervision tree.

name is the name of the queue that the workers pull jobs from.

module is the module that the workers in your queue will use. You may also provide Honeydew.Worker.init/1 args with {module, args}.

You can provide any of the following opts:

  • num: the number of workers to start. Defaults to 10.

  • init_retry_secs: the amount of time, in seconds, a stateful worker waits before trying to re-initialize after its Honeydew.Worker.init/1 function fails. You can also override this behavior by implementing the Honeydew.Worker.init_failed/0 callback, see README/workers.md.

  • shutdown: if a worker is in the middle of a job, the amount of time, in milliseconds, to wait before brutally killing it. Defaults to 10_000.

  • nodes: for :global queues, you can provide a list of nodes to stay connected to (your queue node and enqueuing nodes). Defaults to [].

For example:

  • Honeydew.start_workers("my_awesome_queue", MyJobModule)

  • Honeydew.start_workers("my_awesome_queue", {MyJobModule, [key: "secret key"]}, num: 3)

  • Honeydew.start_workers({:global, "my_awesome_queue"}, MyJobModule, nodes: [:clientfacing@dax, :queue@dax])

Link to this function

status(queue, opts \\ [])

View Source

Specs

status(queue_name(), [status_opt()]) :: map()

Returns the currrent status of the queue and all attached workers.

You can provide any of the following opts:

  • timeout: specifies the time (in miliseconds) the calling process will wait for the queue to return the status,
           note that this timeout does not cancel the status callback execution in the queue.

Specs

stop_queue(queue_name()) :: :ok | {:error, :not_running}

Stops the local instance of the provided queue name.

Specs

stop_workers(queue_name()) :: :ok | {:error, :not_running}

Stops the local workers for the provided queue name.

Specs

suspend(queue_name()) :: :ok

Suspends job processing for a queue.

Link to this function

worker_spec(queue, module_and_args, opts)

View Source
This function is deprecated. Honeydew now supervises your worker processes, please use `Honeydew.start_workers/3 instead.`.

Specs

workers() :: [queue_name()]

Returns a list of queues that have workers are running on this node.

Link to this function

yield(job, timeout \\ 5000)

View Source

Specs

yield(Honeydew.Job.t(), timeout()) :: {:ok, result()} | nil | no_return()

Wait for a job to complete and return result.

Returns the result of a job, or nil on timeout. Raises an ArgumentError if the job was not created with reply: true and in the current process.

Example

Calling yield/2 with different timeouts.

iex> job = Honeydew.async({:ping, ["127.0.0.1"]}, :my_queue, reply: true)
iex> Honeydew.yield(job, 500) # Wait half a second
nil
# Result comes in at 1 second
iex> Honeydew.yield(job, 1000) # Wait up to a second
{:ok, :pong}
iex> Honeydew.yield(job, 0)
nil # <- because the message has already arrived and been handled

The only time yield/2 would ever return the result more than once is if the job executes more than once (as Honeydew aims for at-least-once execution).