m25
Types
Reasons for a job failure.
pub type FailureReason {
Errored
HeartbeatTimeout
JobTimeout
Crash
}
Constructors
-
Errored
The job handler returned an error value.
-
HeartbeatTimeout
The job executor failed to respond to heartbeats.
-
JobTimeout
The job timed out.
-
Crash
The job worker crashed.
Errors that can occur when cancelling a job.
pub type JobCancelError {
JobCancelFetchError(JobRecordFetchError)
InvalidState(JobStatus)
}
Constructors
-
JobCancelFetchError(JobRecordFetchError)
The job could not be fetched from the database.
-
InvalidState(JobStatus)
The job is not in a cancellable state.
pub type JobRecord(input, output, error) {
JobRecord(
id: JobId,
queue_name: String,
created_at: timestamp.Timestamp,
scheduled_at: option.Option(timestamp.Timestamp),
input: input,
reserved_at: option.Option(timestamp.Timestamp),
started_at: option.Option(timestamp.Timestamp),
cancelled_at: option.Option(timestamp.Timestamp),
finished_at: option.Option(timestamp.Timestamp),
timeout: duration.Duration,
status: JobStatus,
output: option.Option(output),
deadline: option.Option(timestamp.Timestamp),
latest_heartbeat_at: option.Option(timestamp.Timestamp),
failure_reason: option.Option(FailureReason),
error_data: option.Option(error),
attempt: Int,
max_attempts: Int,
original_attempt_id: option.Option(JobId),
previous_attempt_id: option.Option(JobId),
retry_delay: duration.Duration,
unique_key: option.Option(String),
)
}
Constructors
-
JobRecord( id: JobId, queue_name: String, created_at: timestamp.Timestamp, scheduled_at: option.Option(timestamp.Timestamp), input: input, reserved_at: option.Option(timestamp.Timestamp), started_at: option.Option(timestamp.Timestamp), cancelled_at: option.Option(timestamp.Timestamp), finished_at: option.Option(timestamp.Timestamp), timeout: duration.Duration, status: JobStatus, output: option.Option(output), deadline: option.Option(timestamp.Timestamp), latest_heartbeat_at: option.Option(timestamp.Timestamp), failure_reason: option.Option(FailureReason), error_data: option.Option(error), attempt: Int, max_attempts: Int, original_attempt_id: option.Option(JobId), previous_attempt_id: option.Option(JobId), retry_delay: duration.Duration, unique_key: option.Option(String), )
Arguments
- deadline
-
The deadline for the job to complete once it has started, based on the configured timeout.
Reasons a job record could not be decoded when fetching from the database.
pub type JobRecordDecodeError {
JobRecordFetchInvalidStatusError(JobId, String)
JobRecordFetchInvalidFailureReason(JobId, String)
JobRecordFetchJsonDecodeError(JobId, json.DecodeError)
}
Constructors
-
JobRecordFetchInvalidStatusError(JobId, String)
-
JobRecordFetchInvalidFailureReason(JobId, String)
-
JobRecordFetchJsonDecodeError(JobId, json.DecodeError)
An error that occurs when a job record can not be fetched from the database.
pub type JobRecordFetchError {
JobRecordFetchQueryError(pog.QueryError)
JobRecordFetchDecodeErrors(List(JobRecordDecodeError))
NoJobRecordFound
}
Constructors
-
JobRecordFetchQueryError(pog.QueryError)
-
JobRecordFetchDecodeErrors(List(JobRecordDecodeError))
-
NoJobRecordFound
pub type JobStatus {
Pending
Reserved
Executing
Succeeded
Failed
Cancelled
}
Constructors
-
Pending
-
Reserved
-
Executing
-
Succeeded
-
Failed
-
Cancelled
A queue to be used by M25. All of the input, output and error values must be serialisable to JSON so that they may be inserted into the database.
Note: the concurrency count is defined per running application using M25, and does not apply to the cluster as a whole.
pub type Queue(input, output, error) {
Queue(
name: String,
max_concurrency: Int,
input_to_json: fn(input) -> json.Json,
input_decoder: decode.Decoder(input),
output_to_json: fn(output) -> json.Json,
output_decoder: decode.Decoder(output),
error_to_json: fn(error) -> json.Json,
error_decoder: decode.Decoder(error),
handler_function: fn(input) -> Result(output, error),
default_job_timeout: duration.Duration,
poll_interval: Int,
heartbeat_interval: Int,
allowed_heartbeat_misses: Int,
executor_init_timeout: Int,
reserved_timeout: Int,
)
}
Constructors
-
Queue( name: String, max_concurrency: Int, input_to_json: fn(input) -> json.Json, input_decoder: decode.Decoder(input), output_to_json: fn(output) -> json.Json, output_decoder: decode.Decoder(output), error_to_json: fn(error) -> json.Json, error_decoder: decode.Decoder(error), handler_function: fn(input) -> Result(output, error), default_job_timeout: duration.Duration, poll_interval: Int, heartbeat_interval: Int, allowed_heartbeat_misses: Int, executor_init_timeout: Int, reserved_timeout: Int, )
Arguments
- name
-
The name of the queue. This should be unique.
- max_concurrency
-
The maximum number of jobs that can be running at once. Note: this is per application, not per cluster.
- input_to_json
-
A function to encode the input to JSON for input into the database.
- input_decoder
-
A decoder to decode the JSON for the job input stored in the database.
- output_to_json
-
A function to encode the successful output of the job to JSON for input into the database.
- output_decoder
-
A decoder to decode the JSON for the job output stored in the database.
- error_to_json
-
A function to encode the error of the job to JSON for input into the database.
- error_decoder
-
A decoder to decode the JSON for the job error stored in the database.
- handler_function
-
The handler function to process the job.
- default_job_timeout
-
How long a job may run before it is considered failed.
- poll_interval
-
How frequently the queue manager should check the database for new jobs in milliseconds.
- heartbeat_interval
-
How frequently to send heartbeat to the individual job manager in milliseconds.
- allowed_heartbeat_misses
-
How many times the heartbeat is allowed to be missed before the job is considered failed.
- executor_init_timeout
-
How long a job executor actor may take to initialise in milliseconds.
- reserved_timeout
-
How long a job can stay in the ‘reserved’ state before it should be reverted to ‘pending’.
Values
pub fn add_queue(
m25: M25,
queue: Queue(input, output, error),
) -> Result(M25, Nil)
Register a queue to be used by M25. All of the input, output and error values must be serialisable to JSON so that they may be inserted into the database.
Returns Error(Nil)
if a queue with the same name has already been registered.
pub fn main() {
let assert Ok(m25) = m25.new(conn)
|> m25.add_queue(queue1)
|> result.try(m25.add_queue(_, queue2))
|> result.try(m25.add_queue(_, queue3))
let assert Ok(_) = m25.start(m25)
}
pub fn cancel_job(
conn: pog.Connection,
queue: Queue(input, output, error),
id: JobId,
) -> Result(JobRecord(input, output, error), JobCancelError)
Cancel a job from the database by its ID. You can only cancel a job that is in the
Pending
state.
pub fn enqueue(
conn: pog.Connection,
queue: Queue(input, output, error),
job: Job(input),
) -> Result(JobRecord(input, output, error), JobRecordFetchError)
Enqueue a job to be executed as soon as a worker is available.
pub fn get_job(
conn: pog.Connection,
queue: Queue(input, output, error),
id: JobId,
) -> Result(JobRecord(input, output, error), JobRecordFetchError)
Get a job from the database by its ID.
pub fn new(conn: pog.Connection) -> M25
Create a new M25 instance. It’s recommended that you use a supervised pog
connection.
let conn_name = process.new_name("db_connection")
let conn_child =
pog.default_config(conn_name)
|> pog.host("localhost")
|> pog.database("my_database")
|> pog.pool_size(15)
|> pog.supervised
// Create a connection that can be accessed by our queue handlers
let conn = pog.named_connection(conn_name)
let m25 = m25.new(conn)
pub fn new_job(input: input) -> Job(input)
Create a new job with default values and the given input. The input must match the input type of the queue you’ll be enqueuing it to.
pub fn retry(
job: Job(a),
max_attempts max_attempts: Int,
delay retry_delay: option.Option(duration.Duration),
) -> Job(a)
Configure retry behavior for a job. If no retry delay is provided, the job will be retried immediately.
pub fn schedule(
job: Job(a),
at scheduled_at: timestamp.Timestamp,
) -> Job(a)
Schedule a job to be executed at a specific time. If that time is in the past, the job will be executed immediately.
pub fn start(
m25: M25,
queue_init_timeout: Int,
) -> Result(
actor.Started(static_supervisor.Supervisor),
actor.StartError,
)
Start M25 in an unsupervised fashion. This is not recommended. You should prefer
using supervised
to start M25 as part of your supervision tree.
pub fn supervised(
m25: M25,
queue_init_timeout: Int,
) -> supervision.ChildSpecification(static_supervisor.Supervisor)
Create a child spec for the M25 process, allowing it to be run as part of a supervision tree.
pub fn timeout(
job: Job(a),
timeout timeout: duration.Duration,
) -> Job(a)
Add a timeout to a job that overrides the queue’s default timeout.