Reactor.Executor.ConcurrencyTracker (reactor v0.17.0)

View Source

Manage shared concurrency pools for multiple Reactors.

When running a Reactor you can pass the concurrency_key option, which will cause the Reactor to use the specified pool to ensure that the combined Reactors never exceed the pool's available concurrency limit.

This avoids nested Reactors spawning too many workers and thrashing the system.

The process calling allocate_pool/1 is monitored, and when it terminates it's allocation is removed. Any processes which are using that pool will not be able to allocate any new resources.

Summary

Functions

Attempt to acquire a number of concurrency allocations from the pool.

Allocate a new concurrency pool and set the maximum limit.

Returns a specification to start this module under a supervisor.

Release concurrency allocation back to the pool.

Release the concurrency pool.

Report the available and maximum concurrency for a pool.

Types

pool_key()

@type pool_key() :: reference()

record()

@type record() ::
  {pool_key(), concurrency_limit :: pos_integer(),
   available_slots :: non_neg_integer(), allocator :: pid()}

Functions

acquire(key, how_many \\ 1)

@spec acquire(pool_key(), how_many :: pos_integer()) :: {:ok, non_neg_integer()}

Attempt to acquire a number of concurrency allocations from the pool.

Returns {:ok, n} where n was the number of slots that were actually allocated. It's important to note that whilst you may request 16 slots, if there is only 3 available, then this function will return {:ok, 3} and you must abide by it.

It is possible for this function to return {:ok, 0} if there is no slots available.

allocate_pool(concurrency_limit)

@spec allocate_pool(non_neg_integer()) :: pool_key()

Allocate a new concurrency pool and set the maximum limit.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

release(key, how_many \\ 1)

@spec release(pool_key(), how_many :: pos_integer()) :: :ok | :error

Release concurrency allocation back to the pool.

release_pool(pool_key)

@spec release_pool(pool_key()) :: :ok

Release the concurrency pool.

This deletes the pool, however doesn't affect any processes currently using it. No more resources can be acquired by users of the pool key.

status(key)

@spec status(pool_key()) :: {:ok, available, limit} | {:error, any()}
when available: non_neg_integer(), limit: pos_integer()

Report the available and maximum concurrency for a pool.