Job (ci v0.1.1) View Source
Managed execution of potentially failing actions.
A job is a logical unit of work which is split into multiple actions, where each action is running in its own separate process, which is a child of the job process.
Basic sketch
Job.start_link(fn ->
# this is the root action
{:ok, action1} = Job.start_action(fn ->
# ...
end)
{:ok, action2} = Job.start_action(fn ->
result_of_action_3 =
Job.run(fn ->
end)
# ...
end)
{Job.await(action1), Job.await(action2)}
end)
Comparison to tasks
Job is somewhat similar to Task
, with the main difference that it manages the execution of its
actions. The job process stops only after all of its actions have terminated, which is a
guarantee not provided by Task
.
In addition, Job
makes some different decisions. Most notably, a crash of an action is
automatically converted into an error result, which simplifies the implementation of custom crash
logic (e.g. error reporting).
If slight asynchronism during termination can be tolerated, and there's no need to implement
custom crash logic, Task
is likely a better option.
Actions
Each job runs one or more actions, which are technically child processes of the job process.
A job is started with the root action. When the root action stops, the job process will also
stop with the same exit reason. This is the only case where Job
propagates a process exit.
In its simplest form, an action can be a zero-arity function or an MFA tuple. In both cases,
the action process will be powered by Task
. You can also power the action process by your own
module. See "Custom action processes" for details.
Each action can start additional actions. Logically, such action is treated as the child of the action which started it. If the parent action terminates, its children will also be taken down.
Technically every action process will be running as a direct child of the job process. This decision is made to simplify the process tree and reduce the amount of running processes.
You can start additional actions with start_action/2
or run_action/2
. If start_action/2
is
used, the action result can be awaited on with await/1
. These functions may only be invoked
inside an action process.
In case of Task
-powered jobs, the action result is the return value of the invoked function. If
the action process crashes, the result will be {:exit, exit_reason}
. To avoid ambiguity, it is
recommended to return ok/error tuples from the actions.
Custom action processes
To power the action by a custom logic (e.g. GenServer
), you need to provide the action
spec factory function to start_action/2
or run_action/2
:
Job.start_action(
fn responder ->
child_spec(responder)
end
)
The factory function takes the responder, which is an arity one anonymous function. The function
should return a child specification (Parent.start_spec/0
) which describes how to start the
action process.
The action process needs to invoke responder.(action_response)
to send the response back to
its caller. This function may only be invoked inside the action process. As soon as this function
is invoked, the action process must stop with the reason :normal
.
If the action process is stopping with an abnormal exit reason, it shouldn't invoke the responder function. Doing this will lead to duplicate response message in the mailbox of the parent action.
For example of custom actions, see OsCmd
and Job.Pipeline
.
Internals
The job process is powered by Parent
, with all actions running as its children. Logical
hierarchy (parent-child relationship between actions) is modeled via bound siblings feature
of Parent
.
Link to this section Summary
Functions
Awaits for the job or the action response.
Returns a child specification for inserting a job into the supervision tree.
Starts the job with start_link/2
, and awaits for it with await/1
.
Starts a child action and awaits for its response.
Starts a child action.
Starts the job process and the root action.
Link to this section Types
Specs
action() :: action_fun_or_mfa() | {action_fun_or_mfa(), [action_opt()]} | (responder() -> {Parent.start_spec(), [action_opt()]})
Specs
Specs
Specs
responder() :: (response() -> :ok)
Specs
response() :: any()
Specs
start_opt() :: {:respond_to, pid()} | {:name, GenServer.name()} | action_opt()
Link to this section Functions
Specs
await(GenServer.server()) :: response() | {:exit, reason :: any()}
Awaits for the job or the action response.
This function must be invoked in the process which receives the response message. In the case of
the job, this is the process specified with the :respond_to
start option. When awaiting on the
action, this is the logical parent action (i.e. the process that started the action).
This function will await indefinitely. There's no support for client-side timeout. Instead, you
can use the :timeout
action option to limit the duration of an action.
If the awaited job or action crashes, the response will be {:exit, exit_reason}
. To avoid
ambiguity, it is recommended to return ok/error tuples from each action.
Specs
child_spec({action(), [start_opt()]} | action()) :: Parent.child_spec()
Returns a child specification for inserting a job into the supervision tree.
Specs
Starts the job with start_link/2
, and awaits for it with await/1
.
Specs
run_action(action(), [action_opt()]) :: response() | {:exit, reason :: any()}
Starts a child action and awaits for its response.
Specs
start_action(action(), [action_opt()]) :: Parent.on_start_child()
Starts a child action.
See module documentation for details on action specification.
The started action process will logically be treated as the child of the caller process. If the caller process terminates, the started process will be taken down as well.
Unlike the root action, a child action always sends response to its caller. You can await this
response with await/1
.
This function can only be invoked inside an action process.
Options
:timeout
- Maximum duration of the action. If the action exceeds the given duration, it will be forcefully taken down. Defaults to:infinity
.:telemetry_id
- If provided, telemetry start and stop events will be emitted. This option must be a list. Job will append:start
and:stop
atoms to this list to emit the events.:telemetry_meta
- Additional metadata to send with telemetry events. If not provided, an empty map is used.
Specs
start_link(action(), [start_opt()]) :: GenServer.on_start()
Starts the job process and the root action.
Options
:respond_to
- a pid of the process that will receive the result of the root action. The target process can await on the result withawait/1
. If this option is not provided, the response will not be sent.name
- Registered name of the process. If not provided, the process won't be registered.- action option - see
start_action/2
for details.