m25

Types

Reasons for a job failure.

pub type FailureReason {
  Errored
  HeartbeatTimeout
  JobTimeout
  Crash
}

Constructors

  • Errored

    The job handler returned an error value.

  • HeartbeatTimeout

    The job executor failed to respond to heartbeats.

  • JobTimeout

    The job timed out.

  • Crash

    The job worker crashed.

A background job to be executed.

pub opaque type Job(input)

Errors that can occur when cancelling a job.

pub type JobCancelError {
  JobCancelFetchError(JobRecordFetchError)
  InvalidState(JobStatus)
}

Constructors

  • JobCancelFetchError(JobRecordFetchError)

    The job could not be fetched from the database.

  • InvalidState(JobStatus)

    The job is not in a cancellable state.

pub type JobId {
  JobId(value: uuid.Uuid)
}

Constructors

pub type JobRecord(input, output, error) {
  JobRecord(
    id: JobId,
    queue_name: String,
    created_at: timestamp.Timestamp,
    scheduled_at: option.Option(timestamp.Timestamp),
    input: input,
    reserved_at: option.Option(timestamp.Timestamp),
    started_at: option.Option(timestamp.Timestamp),
    cancelled_at: option.Option(timestamp.Timestamp),
    finished_at: option.Option(timestamp.Timestamp),
    timeout: duration.Duration,
    status: JobStatus,
    output: option.Option(output),
    deadline: option.Option(timestamp.Timestamp),
    latest_heartbeat_at: option.Option(timestamp.Timestamp),
    failure_reason: option.Option(FailureReason),
    error_data: option.Option(error),
    attempt: Int,
    max_attempts: Int,
    original_attempt_id: option.Option(JobId),
    previous_attempt_id: option.Option(JobId),
    retry_delay: duration.Duration,
    unique_key: option.Option(String),
  )
}

Constructors

Reasons a job record could not be decoded when fetching from the database.

pub type JobRecordDecodeError {
  JobRecordFetchInvalidStatusError(JobId, String)
  JobRecordFetchInvalidFailureReason(JobId, String)
  JobRecordFetchJsonDecodeError(JobId, json.DecodeError)
}

Constructors

An error that occurs when a job record can not be fetched from the database.

pub type JobRecordFetchError {
  JobRecordFetchQueryError(pog.QueryError)
  JobRecordFetchDecodeErrors(List(JobRecordDecodeError))
  NoJobRecordFound
}

Constructors

pub type JobStatus {
  Pending
  Reserved
  Executing
  Succeeded
  Failed
  Cancelled
}

Constructors

  • Pending
  • Reserved
  • Executing
  • Succeeded
  • Failed
  • Cancelled
pub opaque type M25

A queue to be used by M25. All of the input, output and error values must be serialisable to JSON so that they may be inserted into the database.

Note: the concurrency count is defined per running application using M25, and does not apply to the cluster as a whole.

pub type Queue(input, output, error) {
  Queue(
    name: String,
    max_concurrency: Int,
    input_to_json: fn(input) -> json.Json,
    input_decoder: decode.Decoder(input),
    output_to_json: fn(output) -> json.Json,
    output_decoder: decode.Decoder(output),
    error_to_json: fn(error) -> json.Json,
    error_decoder: decode.Decoder(error),
    handler_function: fn(input) -> Result(output, error),
    default_job_timeout: duration.Duration,
    poll_interval: Int,
    heartbeat_interval: Int,
    allowed_heartbeat_misses: Int,
    executor_init_timeout: Int,
    reserved_timeout: Int,
  )
}

Constructors

  • Queue(
      name: String,
      max_concurrency: Int,
      input_to_json: fn(input) -> json.Json,
      input_decoder: decode.Decoder(input),
      output_to_json: fn(output) -> json.Json,
      output_decoder: decode.Decoder(output),
      error_to_json: fn(error) -> json.Json,
      error_decoder: decode.Decoder(error),
      handler_function: fn(input) -> Result(output, error),
      default_job_timeout: duration.Duration,
      poll_interval: Int,
      heartbeat_interval: Int,
      allowed_heartbeat_misses: Int,
      executor_init_timeout: Int,
      reserved_timeout: Int,
    )

    Arguments

    name

    The name of the queue. This should be unique.

    max_concurrency

    The maximum number of jobs that can be running at once. Note: this is per application, not per cluster.

    input_to_json

    A function to encode the input to JSON for input into the database.

    input_decoder

    A decoder to decode the JSON for the job input stored in the database.

    output_to_json

    A function to encode the successful output of the job to JSON for input into the database.

    output_decoder

    A decoder to decode the JSON for the job output stored in the database.

    error_to_json

    A function to encode the error of the job to JSON for input into the database.

    error_decoder

    A decoder to decode the JSON for the job error stored in the database.

    handler_function

    The handler function to process the job.

    default_job_timeout

    How long a job may run before it is considered failed.

    poll_interval

    How frequently the queue manager should check the database for new jobs in milliseconds.

    heartbeat_interval

    How frequently to send heartbeat to the individual job manager in milliseconds.

    allowed_heartbeat_misses

    How many times the heartbeat is allowed to be missed before the job is considered failed.

    executor_init_timeout

    How long a job executor actor may take to initialise in milliseconds.

    reserved_timeout

    How long a job can stay in the ‘reserved’ state before it should be reverted to ‘pending’.

Values

pub fn add_queue(
  m25: M25,
  queue: Queue(input, output, error),
) -> Result(M25, Nil)

Register a queue to be used by M25. All of the input, output and error values must be serialisable to JSON so that they may be inserted into the database.

Returns Error(Nil) if a queue with the same name has already been registered.

pub fn main() {
  let assert Ok(m25) = m25.new(conn)
    |> m25.add_queue(queue1)
    |> result.try(m25.add_queue(_, queue2))
    |> result.try(m25.add_queue(_, queue3))

  let assert Ok(_) = m25.start(m25)
}
pub fn cancel_job(
  conn: pog.Connection,
  queue: Queue(input, output, error),
  id: JobId,
) -> Result(JobRecord(input, output, error), JobCancelError)

Cancel a job from the database by its ID. You can only cancel a job that is in the Pending state.

pub fn enqueue(
  conn: pog.Connection,
  queue: Queue(input, output, error),
  job: Job(input),
) -> Result(JobRecord(input, output, error), JobRecordFetchError)

Enqueue a job to be executed as soon as a worker is available.

pub fn get_job(
  conn: pog.Connection,
  queue: Queue(input, output, error),
  id: JobId,
) -> Result(JobRecord(input, output, error), JobRecordFetchError)

Get a job from the database by its ID.

pub fn new(conn: pog.Connection) -> M25

Create a new M25 instance. It’s recommended that you use a supervised pog connection.

let conn_name = process.new_name("db_connection")

let conn_child =
  pog.default_config(conn_name)
  |> pog.host("localhost")
  |> pog.database("my_database")
  |> pog.pool_size(15)
  |> pog.supervised

// Create a connection that can be accessed by our queue handlers
let conn = pog.named_connection(conn_name)

let m25 = m25.new(conn)
pub fn new_job(input: input) -> Job(input)

Create a new job with default values and the given input. The input must match the input type of the queue you’ll be enqueuing it to.

pub fn retry(
  job: Job(a),
  max_attempts max_attempts: Int,
  delay retry_delay: option.Option(duration.Duration),
) -> Job(a)

Configure retry behavior for a job. If no retry delay is provided, the job will be retried immediately.

pub fn schedule(
  job: Job(a),
  at scheduled_at: timestamp.Timestamp,
) -> Job(a)

Schedule a job to be executed at a specific time. If that time is in the past, the job will be executed immediately.

pub fn start(
  m25: M25,
  queue_init_timeout: Int,
) -> Result(
  actor.Started(static_supervisor.Supervisor),
  actor.StartError,
)

Start M25 in an unsupervised fashion. This is not recommended. You should prefer using supervised to start M25 as part of your supervision tree.

pub fn supervised(
  m25: M25,
  queue_init_timeout: Int,
) -> supervision.ChildSpecification(static_supervisor.Supervisor)

Create a child spec for the M25 process, allowing it to be run as part of a supervision tree.

pub fn timeout(
  job: Job(a),
  timeout timeout: duration.Duration,
) -> Job(a)

Add a timeout to a job that overrides the queue’s default timeout.

pub fn unique_key(job: Job(a), key unique_key: String) -> Job(a)

Set a unique key for a job. This will prevent the job being enqueued if it already exists in a non-errored state. If the only matching attempts have failed or crashed, the job can still be enqueued.

Search Document