View Source OddJob (OddJob v0.5.1)

Job pools for Elixir OTP applications, written in Elixir.

Use OddJob when you need to limit concurrency of background processing in your Elixir app, like forcing backpressure on calls to databases or external APIs. OddJob is easy to use and configure, and provides functions for fire-and-forget jobs, async/await calls where the results must be returned, and job scheduling. Job queues are stored in process memory so no database is required.

Installation

The package can be installed by adding odd_job to your list of dependencies in mix.exs:

def deps do
[
  {:odd_job, "~> 0.4.0"}
]
end

Getting started

After installation you can start processing jobs right away. OddJob automatically starts up a supervised job pool of 5 workers out of the box with no configuration required. The default name of this job pool is OddJob.Pool, and it can be sent work in the following way:

OddJob.perform(OddJob.Pool, fn -> do_some_work() end)

You can skip ahead for more usage, or read on for a guide to configuring your job pools.

Configuration

The default pool can be customized in your config if you want to change the pool size:

config :odd_job,
  pool_size: 10 # defaults to the number of schedulers online

If you are processing jobs that have a high chance of failure, you may want to customize the max_restarts and max_seconds options to prevent all the workers in a pool from restarting if too many jobs are failing. These options default to the Supervisor defaults (max_restarts: 3, max_seconds: 5) and can be overridden in your config:

config :odd_job,
  pool_size: 10,
  max_restarts: 10,
  max_seconds: 2

Extra pools

You can add extra pools to be supervised by the OddJob application supervision tree:

config :odd_job,
  extra_pools: [MyApp.Email, MyApp.ExternalService]

By default, extra pools will be configured with the same options as your default pool. Luckily, extra pools can receive their own list of overrides:

config :odd_job,
  pool_size: 10,
  max_restarts: 5,
  extra_pools: [
    MyApp.Email, # MyApp.Email will use the defaults
    "MyApp.ExternalService": [ # the MyApp.ExternalService pool gets its own config
      pool_size: 5,
      max_restarts: 2
    ]
  ]

Next we'll see how you can add job pools to your own application's supervision tree. If you don't want OddJob to supervise any pools for you (including the default OddJob.Pool pool) do not set a value for :extra_pools and pass false to the :default_pool config key:

config :odd_job, default_pool: false

Supervising job pools

You can dynamically start a new job pool linked to the current process by calling OddJob.start_link/1:

{:ok, pid} = OddJob.start_link(name: MyApp.Email, pool_size: 10)
OddJob.perform(MyApp.Email, fn -> do_something() end)
#=> :ok

The first argument to the function is the name of the pool, the second argument is a keyword list of options to configure the pool. See the OddJob.start_link/1 documentation for more details.

In most cases you'll want to supervise your job pools, which you can do by adding a tuple in the form of {OddJob, options} directly to the top level of your application's supervision tree or any other list of child specs for a supervisor:

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do

    children = [
      {OddJob, name: MyApp.Email},
      {OddJob, name: MyApp.ExternalService}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

The tuple {OddJob, name: MyApp.Email} will return a child spec for a supervisor that will start and supervise the MyApp.Email pool. The second element of the tuple must be a keyword list of options with a :name key and a unique name value as an atom. You can supervise as many pools as you want, as long as they have unique names.

Any default configuration options listed in your config.exs will also apply to your own supervised pools. You can override the config for any pool by specifying the configuration in your child spec options:

children = [
  # The MyApp.Email pool will use the default config:
  {OddJob, name: MyApp.Email},
  # The MyApp.ExternalService pool will not:
  {OddJob, name: MyApp.ExternalService, pool_size: 20, max_restarts: 10}
]

Module-based pools

You may want to configure your pool at runtime, or wrap your logic in a custom API. Module-based pools are great for this. Invoking use OddJob.Pool defines a child_spec/1 function that can be used to start your pool under a supervisor.

Imagine you want to start a job pool with a dynamically configurable pool size and wrap it in a client API:

defmodule MyApp.Email do
  use OddJob.Pool

  def start_link(init_arg) do
    OddJob.start_link(name: __MODULE__, pool_size: init_arg)
  end

  # Client API

  def send_email(user) do
    OddJob.perform(__MODULE__, fn -> MyApp.Mailer.send(user) end)
  end
end

Now you can supervise your pool and set the pool size in a child spec tuple:

children = [
  {MyApp.Email, 20}
]

Supervisor.start_link(children, strategy: :one_for_one)

You can also skip the initial argument by passing MyApp.Email on its own:

# in my_app/application.ex

children = [
  MyApp.Email # Same as {MyApp.Email, []}
]

Supervisor.start_link(children, strategy: :one_for_one)

# in my_app/email.ex

defmodule MyApp.Email do
  use OddJob.Pool

  def start_link(_init_arg) do
    OddJob.start_link(name: __MODULE__)
  end
end

For convenience, use OddJob.Pool automatically defines an overridable start_link/1 function just like the one above, that ignores the initial argument and names the pool after the module, using the default configuration options. This means the above example is equivalent to:

defmodule MyApp.Email do
  use OddJob.Pool
end

You can pass any supervision start options to use OddJob.Pool:

use OddJob.Pool, restart: :transient, shutdown: :brutal_kill

The default options are the same as any Supervisor. See the Supervisor module for more info on supervision start options.

All of the previously mentioned config options can be combined. You can have a default pool, extra pools in the OddJob supervision tree, and pools to be supervised by your own application, all of which can either use the default config or their own overrides.

Usage

A job pool can be sent jobs by passing its unique name and an anonymous function to one of the OddJob module's perform functions:

job = OddJob.async_perform(MyApp.ExternalService, fn -> get_data(user) end)
# do something else
data = OddJob.await(job)
OddJob.perform(MyApp.Email, fn -> send_email(user, data) end)

If a worker in the pool is available then the job will be performed right away. If all of the workers are already assigned to other jobs then the new job will be added to a FIFO queue. Jobs in the queue are performed as workers become available.

Use perform/2 for immediate fire and forget jobs where you don't care about the results or if it succeeds. async_perform/2 and await/1 follow the async/await pattern in the Task module, and are useful when you need to retrieve the results and you care about success or failure. Similarly to Task.async/1, async jobs will be linked and monitored by the caller (in this case, through a proxy). If either the caller or the job crash or exit, the other will crash or exit with the same reason.

Scheduled jobs

Jobs can be scheduled for later execution with perform_after/3 and perform_at/3:

OddJob.perform_after(1_000_000, OddJob.Pool, fn -> clean_database() end) # accepts a timer in milliseconds

time = DateTime.utc_now |> DateTime.add(60 * 60 * 24, :second) # 24 hours from now
OddJob.perform_at(time, OddJob.Pool, fn -> verify_work_is_done() end) # accepts a future DateTime struct

The scheduling functions return a unique timer reference which can be read with Process.read_timer/ 1 and cancelled with OddJob.cancel_timer/1, which will cancel execution of the job itself and clean up after itself by causing the scheduler process to exit. When the timer is up the job will be sent to the pool and can no longer be aborted.

ref = OddJob.perform_after(5000, OddJob.Pool, fn -> :will_be_canceled end)

# somewhere else in your code
if some_condition() do
  OddJob.cancel_timer(ref)
end

Note that there is no guarantee that a scheduled job will be executed immediately when the timer runs out. Like all jobs it is sent to the pool and if all workers are busy then the job enters the queue to be performed as soon as a worker is available.

License

MIT - Copyright (c) 2022 M. Simon Borg

Link to this section Summary

Functions

Performs an async job that can be awaited on for the result.

Sends a collection of async jobs to the pool.

Awaits on an async job reply and returns the results.

Awaits replies from multiple async jobs and returns them in a list.

Cancels a scheduled job.

Returns a specification to start this module under a supervisor.

Performs a fire and forget job.

Sends a job to the pool after the given timer has elapsed.

Sends a job to the pool at the given time.

Sends a collection of jobs to the pool.

Sends a collection of jobs to the pool after the given timer has elapsed.

Sends a collection of jobs to the pool at the given time.

A macro for creating jobs with an expressive DSL.

A macro for creating jobs with an expressive DSL.

Returns the pid of the OddJob.Pool.Supervisor that supervises the job pool's workers.

Returns the registered name in :via of the OddJob.Pool.Supervisor that supervises the pool's workers

Returns a two element tuple {pid, state} with the pid of the job queue as the first element. The second element is the state of the queue represented by an OddJob.Queue struct.

Returns the registered name in :via for the queue process. See the Registry module for more information on :via process name registration.

Starts an OddJob.Pool supervision tree linked to the current process.

Returns the pid of the top level supervisor in the pool supervision tree, nil if a process can't be found.

Returns a list of pids for the specified worker pool.

Link to this section Types

Specs

child_spec() :: OddJob.Pool.child_spec()

Specs

date_time() :: DateTime.t()

Specs

job() :: OddJob.Job.t()

Specs

name() :: OddJob.Registry.name()

Specs

not_found() :: {:error, :not_found}

Specs

options() :: OddJob.Pool.options()

Specs

pool() :: atom()

Specs

queue() :: OddJob.Queue.t()

Specs

start_option() :: OddJob.Pool.start_option()

Specs

timer() :: non_neg_integer()

Link to this section Functions

Link to this function

async_perform(pool, function)

View Source (since 0.1.0)

Specs

async_perform(pool(), function()) :: job() | not_found()

Performs an async job that can be awaited on for the result.

Returns the OddJob.Job.t() struct if the pool server exists at the time of calling, or {:error, :not_found} if it doesn't.

Functions similarly to Task.async/1 and Task.await/2 with some tweaks. Since the workers in the pool already exist when this function is called and may be busy performing other jobs, it is unknown at the time of calling when the job will be performed and by which worker process.

An OddJob.Async.Proxy is dynamically created and linked and monitored by the caller. The Proxy handles the transactions with the pool and is notified when a worker is assigned. The proxy links and monitors the worker immediately before performance of the job, forwarding any exits or crashes to the caller. The worker returns the results to the proxy, which then passes them to the caller with a {ref, result} message. The proxy immediately exits with reason :normal after the results are sent back to the caller.

See await/2 and await_many/2 for functions to retrieve the results, and the Task module for more information on the async/await patter and when you may want to use it.

Examples

iex> job = OddJob.async_perform(OddJob.Pool, fn -> :math.exp(100) end)
iex> OddJob.await(job)
2.6881171418161356e43

iex> OddJob.async_perform(:does_not_exist, fn -> "never called" end)
{:error, :not_found}
Link to this function

async_perform_many(pool, collection, function)

View Source (since 0.4.0)

Specs

async_perform_many(pool(), list() | map(), function()) :: [job()] | not_found()

Sends a collection of async jobs to the pool.

Returns a list of OddJob.Job.t() structs if the pool server exists at the time of calling, or {:error, :not_found} if it doesn't.

Enumerates over the collection, using each member as the argument to an anonymous function with an arity of 1. See async_perform/2 and perform/2 for more information.

There's a limit to the number of jobs that can be started with this function that roughly equals the BEAM's process limit.

Examples

iex> jobs = OddJob.async_perform_many(OddJob.Pool, 1..5, fn x -> x ** 2 end)
iex> OddJob.await_many(jobs)
[1, 4, 9, 16, 25]

iex> OddJob.async_perform_many(:does_not_exist, ["never", "called"], fn x -> x end)
{:error, :not_found}
Link to this function

await(job, timeout \\ 5000)

View Source (since 0.1.0)

Specs

await(job(), timeout()) :: term()

Awaits on an async job reply and returns the results.

The current process and the worker process are linked during execution of the job by an OddJob.Async.Proxy. In case the worker process dies during execution, the current process will exit with the same reason as the worker.

A timeout, in milliseconds or :infinity, can be given with a default value of 5000. If the timeout is exceeded, then the current process will exit. If the worker process is linked to the current process which is the case when a job is started with async, then the worker process will also exit.

This function assumes the proxy's monitor is still active or the monitor's :DOWN message is in the message queue. If it has been demonitored, or the message already received, this function will wait for the duration of the timeout awaiting the message.

Examples

iex> OddJob.async_perform(OddJob.Pool, fn -> :math.log(2.6881171418161356e43) end)
...> |> OddJob.await()
100.0
Link to this function

await_many(jobs, timeout \\ 5000)

View Source (since 0.2.0)

Specs

await_many([job()], timeout()) :: [term()]

Awaits replies from multiple async jobs and returns them in a list.

This function receives a list of jobs and waits for their replies in the given time interval. It returns a list of the results, in the same order as the jobs supplied in the jobs input argument.

The current process and each worker process are linked during execution of the job by an OddJob.Async.Proxy. If any of the worker processes dies, the caller process will exit with the same reason as that worker.

A timeout, in milliseconds or :infinity, can be given with a default value of 5000. If the timeout is exceeded, then the caller process will exit. Any worker processes that are linked to the caller process (which is the case when a job is started with async_perform/2) will also exit.

This function assumes the proxies' monitors are still active or the monitor's :DOWN message is in the message queue. If any jobs have been demonitored, or the message already received, this function will wait for the duration of the timeout.

Examples

iex> job1 = OddJob.async_perform(OddJob.Pool, fn -> 2 ** 2 end)
iex> job2 = OddJob.async_perform(OddJob.Pool, fn -> 3 ** 2 end)
iex> [job1, job2] |> OddJob.await_many()
[4, 9]
Link to this function

cancel_timer(timer_ref)

View Source (since 0.2.0)

Specs

cancel_timer(reference()) :: non_neg_integer() | false

Cancels a scheduled job.

timer_ref is the unique reference returned by perform_at/3 or perform_after/3. This function returns the number of milliseconds left in the timer when cancelled, or false if the timer already expired. If the return is false you can assume that the job has already been sent to the pool for execution.

NOTE: Cancelling the timer with this function ensures that the job is never executed and that the scheduler process is exited and not left "hanging". Using Process.cancel_timer/1 will also cancel execution, but may leave hanging processes. A hanging scheduler process will eventually timeout, but not until one second after the expiration of the original timer.

Examples

iex> ref = OddJob.perform_after(500, OddJob.Pool, fn -> :never end)
iex> time = OddJob.cancel_timer(ref)
iex> is_integer(time)
true

iex> ref = OddJob.perform_after(10, OddJob.Pool, fn -> :never end)
iex> Process.sleep(11)
iex> OddJob.cancel_timer(ref)
false
Link to this function

child_spec(opts)

View Source (since 0.1.0)

Specs

child_spec(options()) :: child_spec()

Returns a specification to start this module under a supervisor.

Examples

iex> children = [OddJob.child_spec(name: MyApp.Media)]
[%{id: MyApp.Media, start: {OddJob.Pool, :start_link, [[name: MyApp.Media]]}, type: :supervisor}]
iex> {:ok, _pid} = Supervisor.start_link(children, strategy: :one_for_one)
iex> (MyApp.Media |> OddJob.workers() |> length()) == System.schedulers_online()
true

Normally you would start an OddJob pool under a supervision tree with a child specification tuple and not call child_spec/1 directly.

children = [{OddJob, name: MyApp.Media}]
Supervisor.start_link(children, strategy: :one_for_one)

You must pass a keyword list of options to child_spec/1, or as the second element of the child specification tuple.

Options

  • :name - An atom that names the pool. This argument is required.

  • :pool_size - A positive integer, the number of concurrent workers in the pool. Defaults to 5 or your application's config value.

  • :max_restarts - A positive integer, the number of worker restarts allowed in a given timeframe before all of the workers are restarted. Set a higher number if your jobs have a high rate of expected failure. Defaults to 5 or your application's config value.

  • :max_seconds - A positive integer, the timeframe in seconds in which max_restarts applies. Defaults to 3 or your application's config value. See Supervisor for more info on restart intensity options.

See the Supervisor module for more about child specs.

Link to this function

perform(pool, function)

View Source (since 0.1.0)

Specs

perform(pool(), function()) :: :ok | not_found()

Performs a fire and forget job.

Casts the function to the job pool, and if a worker is available it is performed immediately. If no workers are available the job is added to the back of a FIFO queue to be performed as soon as a worker is available.

Returns :ok if the pool process exists at the time of calling, or {:error, :not_found} if it doesn't. The job is casted to the pool, so there can be no guarantee that the pool process will still exist if it crashes between the beginning of the function call and when the message is received.

Use this function for interaction with other processes or the outside world where you don't care about the return results or whether it succeeds.

Examples

iex> parent = self()
iex> :ok = OddJob.perform(OddJob.Pool, fn -> send(parent, :hello) end)
iex> receive do
...>   msg -> msg
...> end
:hello

iex> OddJob.perform(:does_not_exist, fn -> "never called" end)
{:error, :not_found}
Link to this function

perform_after(timer, pool, fun)

View Source (since 0.2.0)

Specs

perform_after(timer(), pool(), function()) :: reference() | not_found()

Sends a job to the pool after the given timer has elapsed.

timer is an integer that indicates the number of milliseconds that should elapse before the job is sent to the pool. The timed message is executed under a separate supervised process, so if the current process crashes the job will still be performed. A timer reference is returned, which can be read with Process.read_timer/1 or cancelled with OddJob.cancel_timer/1. Returns {:error, :not_found} if the pool server does not exist at the time this function is called.

Examples

timer_ref = OddJob.perform_after(5000, OddJob.Pool, fn -> deferred_job() end)
Process.read_timer(timer_ref)
#=> 2836 # time remaining before job is sent to the pool
OddJob.cancel_timer(timer_ref)
#=> 1175 # job has been cancelled

timer_ref = OddJob.perform_after(5000, OddJob.Pool, fn -> deferred_job() end)
Process.sleep(6000)
OddJob.cancel_timer(timer_ref)
#=> false # too much time has passed to cancel the job

iex> OddJob.perform_after(5000, :does_not_exist, fn -> "never called" end)
{:error, :not_found}
Link to this function

perform_at(date_time, pool, fun)

View Source (since 0.2.0)

Specs

perform_at(date_time(), pool(), function()) :: reference() | not_found()

Sends a job to the pool at the given time.

time must be a DateTime struct for a time in the future. The timer is executed under a separate supervised process, so if the caller crashes the job will still be performed. A timer reference is returned, which can be read with Process.read_timer/1 or canceled with OddJob.cancel_timer/1. Returns {:error, :not_found} if the pool server does not exist at the time this function is called.

Examples

time = DateTime.utc_now() |> DateTime.add(600, :second)
OddJob.perform_at(time, OddJob.Pool, fn -> scheduled_job() end)

iex> time = DateTime.utc_now() |> DateTime.add(600, :second)
iex> OddJob.perform_at(time, :does_not_exist, fn -> "never called" end)
{:error, :not_found}
Link to this function

perform_many(pool, collection, function)

View Source (since 0.4.0)

Specs

perform_many(pool(), list() | map(), function()) :: :ok | not_found()

Sends a collection of jobs to the pool.

Enumerates over the collection, using each member as the argument to an anonymous function witn an arity of 1. The collection of jobs is sent in bulk to the pool where they will be immediately performed by any available workers. If there are no available workers the jobs are added to the FIFO queue. Jobs are guaranteed to always process in the same order of the original collection, and insertion into the queue is atomic. No jobs sent from other processes can be inserted between any member of the collection.

Returns :ok if the pool process exists at the time of calling, or {:error, :not_found} if it doesn't. See perform/2 for more.

Examples

iex> caller = self()
iex> OddJob.perform_many(OddJob.Pool, 1..5, fn x -> send(caller, x) end)
iex> for x <- 1..5 do
...>   receive do
...>     ^x -> x
...>   end
...> end
[1, 2, 3, 4, 5]

iex> OddJob.perform_many(:does_not_exist, ["never", "called"], fn x -> x end)
{:error, :not_found}
Link to this function

perform_many_after(timer, pool, collection, function)

View Source (since 0.4.0)

Specs

perform_many_after(timer(), pool(), list() | map(), function()) ::
  [job()] | not_found()

Sends a collection of jobs to the pool after the given timer has elapsed.

Enumerates over the collection, using each member as the argument to an anonymous function with an arity of 1. Returns a single timer reference. The timer is watched by a single process that will send the entire batch of jobs to the pool when the timer expires. See perform_after/3 for more information about arguments and timers.

Consider using this function instead of perform_after/3 when scheduling a large batch of jobs. perform_after/3 starts a separate scheduler process per job, whereas this function starts a single scheduler process for the whole batch.

Returns {:error, :not_found} if the pool does not exist at the time the function is called.

Examples

timer_ref = OddJob.perform_many_after(5000, OddJob.Pool, 1..5, fn x -> x ** 2 end)
#=> #Reference<0.1431903625.286261254.39156>
# fewer than 5 seconds pass
OddJob.cancel_timer(timer_ref)
#=> 2554 # returns the time remaining. All jobs in the collection have been canceled

timer_ref = OddJob.perform_many_after(5000, OddJob.Pool, 1..5, fn x -> x **2 end)
#=> #Reference<0.1431903625.286261254.39178>
# more than 5 seconds pass
OddJob.cancel_timer(timer_ref)
#=> false # too much time has passed, all jobs have been queued

iex> OddJob.perform_many_after(5000, :does_not_exist, ["never", "called"], fn x -> x end)
{:error, :not_found}
Link to this function

perform_many_at(date_time, pool, collection, function)

View Source (since 0.4.0)

Specs

perform_many_at(date_time(), pool(), list() | map(), function()) ::
  reference() | not_found()

Sends a collection of jobs to the pool at the given time.

Enumerates over the collection, using each member as the argument to an anonymous function with an arity of 1. Returns a single timer reference. The timer is watched by a single process that will send the entire batch of jobs to the pool when the timer expires. See perform_at/3 for more information about arguments and timers.

Consider using this function instead of perform_at/3 when scheduling a large batch of jobs. perform_at/3 starts a separate scheduler process per job, whereas this function starts a single scheduler process for the whole batch.

Returns {:error, :not_found} if the pool does not exist at the time the function is called.

Examples

time = DateTime.utc_now() |> DateTime.add(5, :second)
timer_ref = OddJob.perform_many_at(time, OddJob.Pool, 1..5, fn x -> x ** 2 end)
#=> #Reference<0.1431903625.286261254.39156>
# fewer than 5 seconds pass
OddJob.cancel_timer(timer_ref)
#=> 2554 # returns the time remaining. All jobs in the collection have been canceled

time = DateTime.utc_now() |> DateTime.add(5, :second)
timer_ref = OddJob.perform_many_at(time, OddJob.Pool, 1..5, fn x -> x **2 end)
#=> #Reference<0.1431903625.286261254.39178>
# more than 5 seconds pass
OddJob.cancel_timer(timer_ref)
#=> false # too much time has passed, all jobs have been queued

iex> time = DateTime.utc_now() |> DateTime.add(5, :second)
iex> OddJob.perform_many_at(time, :does_not_exist, ["never", "called"], fn x -> x end)
{:error, :not_found}
Link to this macro

perform_this(pool, contents)

View Source (since 0.3.0) (macro)

A macro for creating jobs with an expressive DSL.

perform_this/2 works like perform/2 except it accepts a do block instead of an anonymous function.

Examples

You must import or require OddJob to use macros:

import OddJob
alias MyApp.Job

perform_this Job do
  some_work()
  some_other_work()
end

perform_this Job, do: something_hard()
Link to this macro

perform_this(pool, option, contents)

View Source (since 0.3.0) (macro)

A macro for creating jobs with an expressive DSL.

perform_this/3 accepts a single configuration option as the second argument that will control execution of the job. The available options provide the functionality of async_perform/2, perform_at/3, and perform_after/3.

Options

  • :async - Passing the atom :async as the second argument before the do block creates an async job that can be awaited on. See async_perform/2.

  • at: time - Use this option to schedule the job for a specific time in the future. time must be a valid Time or DateTime struct. See perform_at/3.

  • after: timer - Use this option to schedule the job to perform after the given timer has elapsed. timer must be in milliseconds. See perform_after/3.

Examples

import OddJob
alias MyApp.Job

time = ~T[03:00:00.000000]
perform_this Job, at: time do
  scheduled_work()
end

perform_this Job, after: 5000, do: something_important()

perform_this Job, :async do
  get_data()
end
|> await()

iex> (perform_this OddJob.Pool, :async, do: 10 ** 2) |> await()
100
Link to this function

pool_supervisor(pool)

View Source (since 0.4.0)

Specs

pool_supervisor(pool()) :: pid() | not_found()

Returns the pid of the OddJob.Pool.Supervisor that supervises the job pool's workers.

Returns {:error, :not_found} if the process does not exist at the time this function is called.

There is no guarantee that the process identified by the pid will still be alive after the results are returned, as it could exit or be killed or restarted at any time. Use pool_supervisor_name/1 to obtain the persistent registered name of the supervisor.

Examples

OddJob.pool_supervisor(OddJob.Pool)
#=> #PID<0.239.0>

iex> OddJob.pool_supervisor(:does_not_exist)
{:error, :not_found}
Link to this function

pool_supervisor_name(pool)

View Source (since 0.4.0)

Specs

pool_supervisor_name(pool()) :: name() | not_found()

Returns the registered name in :via of the OddJob.Pool.Supervisor that supervises the pool's workers

Returns {:error, :not_found} if the process does not exist at the time this function is called.

Examples

iex> OddJob.pool_supervisor_name(OddJob.Pool)
{:via, Registry, {OddJob.Registry, {OddJob.Pool, :pool_sup}}}

iex> OddJob.pool_supervisor_name(:does_not_exist)
{:error, :not_found}
Link to this function

queue(pool)

View Source (since 0.1.0)

Specs

queue(pool()) :: {pid(), queue()} | not_found()

Returns a two element tuple {pid, state} with the pid of the job queue as the first element. The second element is the state of the queue represented by an OddJob.Queue struct.

The queue state has the following fields:

  • :workers - A list of the pids of all workers in the pool

  • :assigned - The pids of the workers currently assigned to jobs

  • :jobs - A list of OddJob.Job structs in the queue awaiting execution

  • :pool - The Elixir term naming the pool that the queue belongs to

Returns {:error, :not_found} if the queue server does not exist at the time this function is called.

Examples

iex> {pid, %OddJob.Queue{pool: pool}} = OddJob.queue(OddJob.Pool)
iex> is_pid(pid)
true
iex> pool
OddJob.Pool

iex> OddJob.queue(:does_not_exist)
{:error, :not_found}
Link to this function

queue_name(pool)

View Source (since 0.4.0)

Specs

queue_name(pool()) :: name() | not_found()

Returns the registered name in :via for the queue process. See the Registry module for more information on :via process name registration.

Returns {:error, :not_found} if the queue server does not exist at the time this function is called.

Examples

iex> OddJob.queue_name(OddJob.Pool)
{:via, Registry, {OddJob.Registry, {OddJob.Pool, :queue}}}

iex> OddJob.queue_name(:does_not_exist)
{:error, :not_found}
Link to this function

start_link(opts)

View Source (since 0.4.0)

Specs

start_link(options()) :: Supervisor.on_start()

Starts an OddJob.Pool supervision tree linked to the current process.

You can start an OddJob pool dynamically with start_link/1 to start processing concurrent jobs:

iex> {:ok, _pid} = OddJob.start_link(name: MyApp.Event, pool_size: 10)
iex> OddJob.async_perform(MyApp.Event, fn -> :do_something end) |> OddJob.await()
:do_something

In most cases you would instead start your pools inside a supervision tree:

children = [{OddJob, name: MyApp.Event, pool_size: 10}]
Supervisor.start_link(children, strategy: :one_for_one)

See OddJob.child_spec/1 for a list of available options.

See Supervisor for more on child specifications and supervision trees.

Link to this function

whereis(pool)

View Source (since 0.4.0)

Specs

whereis(pool()) :: pid() | nil

Returns the pid of the top level supervisor in the pool supervision tree, nil if a process can't be found.

Example

iex> {:ok, pid} = OddJob.start_link(name: :whereis)
iex> OddJob.whereis(:whereis) == pid
true

iex> OddJob.whereis(:where_it_is_not) == nil
true
Link to this function

workers(pool)

View Source (since 0.1.0)

Specs

workers(pool()) :: [pid()] | not_found()

Returns a list of pids for the specified worker pool.

There is no guarantee that the processes will still be alive after the results are returned, as they could exit or be killed at any time.

Returns {:error, :not_found} if the pool does not exist at the time this function is called.

Examples

OddJob.workers(OddJob.Pool)
#=> [#PID<0.105.0>, #PID<0.106.0>, #PID<0.107.0>, #PID<0.108.0>, #PID<0.109.0>]

iex> OddJob.workers(:does_not_exist)
{:error, :not_found}