View Source FaktoryWorker.Testing (faktory_worker v1.9.7)

Test utilities for asserting and refuting whether jobs are enqueued, as well as performing jobs as they will be performed when pulled off the queue at runtime.

See Sandbox Testing to read more.

Summary

Functions

Assert that a job was enqueued for the given job module, optionally matching a set of argument/option filters. By default, this assertion will fail if more than one matching job is found. If you want to assert that multiple matching jobs are enqueued, you can use the :count option. If you want the assertion to pass if at least one matching job is enqueued, pass count: :any (this should generally be discouraged in favor of a specific count, to help catch accidental duplicate enqueues).

Perform the job in the same way that it will be performed when pulled from the queue at runtime. Specifically, this means

Assert that a job was not enqueued for the given job module, optionally specifying a set of argument/option filters.

Resets the queue history for all job modules.

Functions

Link to this function

assert_enqueued(worker_mod, filters \\ [])

View Source
@spec assert_enqueued(module(), args: list(), opts: Keyword.t()) :: true

Assert that a job was enqueued for the given job module, optionally matching a set of argument/option filters. By default, this assertion will fail if more than one matching job is found. If you want to assert that multiple matching jobs are enqueued, you can use the :count option. If you want the assertion to pass if at least one matching job is enqueued, pass count: :any (this should generally be discouraged in favor of a specific count, to help catch accidental duplicate enqueues).

Examples

defmodule MyApp.Test do
  use ExUnit.Case

  import FaktoryWorker.Testing

  setup :reset_queues

  test "enqueues" do
    MyApp.Job.perform_async("argument")

    # assert that _any_ job was enqueued for this module
    assert_enqueued MyApp.Job

    # assert that a specific job was enqueued
    assert_enqueued MyApp.Job, args: ["argument"]

    # assert on multiple arguments (order matters)
    assert_enqueued MyApp.Job, args: ["foo", "bar"]

    # assert that two matching jobs were enqueued
    assert_enqueued MyApp.Job, args: ["foo", "bar"], count: 2

    # assert on serializable arguments
    assert_enqueued MyApp.Job, args: [%MyApp.User{...}]

    # assert on options (only those explicitly passed to `perform_async`)
    assert_enqueued MyApp.Job, opts: [reserve_for: 1_500]

    # assert on multiple filters
    assert_enqueued MyApp.Job, args: ["foo", "bar"], opts: [custom: %{}]
  end
end
@spec describe_jobs([FaktoryWorker.Sandbox.job()]) :: String.t()
Link to this macro

perform_job(worker_mod, arg_or_args \\ [], opts \\ [])

View Source (macro)
@spec perform_job(module(), term() | [term()], Keyword.t()) :: Macro.t()

Perform the job in the same way that it will be performed when pulled from the queue at runtime. Specifically, this means:

  • work will be done in a separate process, under a Task.Supervisor
  • job arguments will go through serialization/deserialization

Blocks until the job has completed, returning the result of executing the perform function. If the job raises or exits, the return value will be {:error, "raise or exit reason"}. If the job times out (execution time exceeds the job's :reserve_for duration), {:error, :timeout} will be returned instead of the normal result.

Note that ExUnit.CaptureLog, ExUnit.CaptureIO, and friends will not work since the job process is not linked to the caller.

Examples

# arguments are optional for 0-arity jobs
perform_job(MyJob)

# single arguments may be passed directly
perform_job(MyJob, 123)

# multiple arguments in a list
perform_job(MyJob, ["foo", "bar"])

# you can pass options as well
perform_job(MyJob, ["arg1"], reserve_for: 1_500)

# if you need to pass options to a 0-arity job, you must
# pass an empty list for arguments
perform_job(MyJob, [], reserve_for: 1_500)
Link to this function

refute_enqueued(worker_mod, filters \\ [])

View Source
@spec refute_enqueued(module(), args: list(), opts: Keyword.t()) :: false

Assert that a job was not enqueued for the given job module, optionally specifying a set of argument/option filters.

Examples

defmodule MyApp.Test do
  use ExUnit.Case

  import FaktoryWorker.Testing

  setup :reset_queues

  test "doesn't enqueue" do
    # refute that _any_ job was enqueued for this module
    refute_enqueued MyApp.Job

    # refute that a specific job was enqueued
    refute_enqueued MyApp.Job, args: ["argument"]

    # refute on multiple arguments (order matters)
    refute_enqueued MyApp.Job, args: ["foo", "bar"]

    # refute on serializable arguments
    refute_enqueued MyApp.Job, args: [%MyApp.User{...}]

    # refute on options (only those explicitly passed to `perform_async`)
    refute_enqueued MyApp.Job, opts: [reserve_for: 1_500]

    # refute on multiple filters
    refute_enqueued MyApp.Job, args: ["foo", "bar"], opts: [custom: %{}]
  end
end
Link to this function

reset_queues(context \\ %{})

View Source

Resets the queue history for all job modules.

Generally, this should be called in the setup block for any tests that use assert_enqueued/2 or refute_enqueued/2, to ensure that test cases don't pollute each other.

This will reset the history for all job modules; if you need finer-grained control over which job modules are reset (generally you shouldn't), use FaktoryWorker.Sandbox.reset/1 instead.

If you don't need to perform any other setup, you can use the atom shorthand

setup :reset_queues