JobsPool

A simple concurrent jobs pool.

You can spawn jobs synchronously:

{:ok, pool} = JobsPool.start_link(10)
Enum.each 1..99, fn(_) ->
  spawn fn ->
    JobsPool.run!(pool, fn -> :timer.sleep(100) end)
  end
end
JobsPool.run!(pool, fn -> :timer.sleep(100) end)
JobsPool.join(pool)     # Should take ~1s

JobsPool.run!/4 blocks until the job is done and returns the job’s result, or reraise if it had an error.

You can also spawn jobs asynchronously:

{:ok, pool} = JobsPool.start_link(10)
Enum.each 1..100, fn(_) ->
  JobsPool.async(pool, fn -> :timer.sleep(100) end)
end
JobsPool.join(pool)     # Should take ~1s

There is currently no way to retrieve an async job’s result.

JobsPool.start_link/2 second argument is an array of options passed to GenServer.start_link/3. For example to create a named pool:

JobsPool.start_link(10, name: :jobs_pool)
JobsPool.run!(:jobs_pool, fn -> :timer.sleep(100) end)

Summary

async(server, fun, key \\ nil)

Execute fun asynchronously

join(server, timeout \\ :infinity)

Wait until all jobs are finished

run!(server, fun, key \\ nil, timeout \\ :infinity)

Execute fun and block until it’s complete, or timeout exceeded

start_link(max_concurrent_jobs, genserver_options \\ [])

Start a JobsPool server with max_concurrent_jobs execution slots

Functions

async(server, fun, key \\ nil)

Execute fun asynchronously.

key can be used to avoid running the same job multiple times, only one job with the same key can be executed or queued at any given time. If no key is given, a random one is generated.

Return the task key.

join(server, timeout \\ :infinity)

Wait until all jobs are finished.

run!(server, fun, key \\ nil, timeout \\ :infinity)

Execute fun and block until it’s complete, or timeout exceeded.

key can be used to avoid running the same job multiple times, only one job with the same key can be executed or queued at any given time. If no key is given, a random one is generated.

Return fun return value, or raise in the current process if it encountered an error.

start_link(max_concurrent_jobs, genserver_options \\ [])

Start a JobsPool server with max_concurrent_jobs execution slots.

genserver_options is passed to GenServer.start_link/3.