bg_jobs - Background job processing for your Gleam app

Package Version Hex Docs

Docs are WIP right now

Most of it is correct but not complete, some parts are duplicated.

What is bg_jobs

When developing web applications, certain tasks, like processing large files or performing heavy calculations can slow down regular request handling. bg_jobs helps by moving these time-consuming operations to a background queue, keeping your application responsive.

This library is SQL-driven, with adapters for sqlite and postgres included. So you don’t need extra dependencies to get background processing up and running. Just connect to your existing database, set up your background jobs, and your app can handle more demanding tasks without impacting user experience.

Bg_jobs provide the following:

Time and date

All dates should be passed in UTC+0

Installation

gleam add bg_jobs

Example usage

A complete example setup can be found in the /example directory.

import bg_jobs
import bg_jobs/job
import bg_jobs/queue
import gleam/otp/static_supervisor

fn example_worker() { 
  jobs.Worker(job_name: "example_job", handler: fn(job: jobs.Job) {
      io.print(job.payload)
  })
}

pub fn main() {
  let conn = todo as "the sqlight connection"

  let bg = 
    static_supervisor.new(static_supervisor.OneForOne)
    |> bg_jobs.new(sqlite_db_adapter.new(conn, []))
    |> bg_jobs.with_queue(
      queue.new("default_queue")
      |> queue.with_worker(example_worker())
    )
    |> bg_jobs.build()

  // Dispatch a new job
  jobs.new(example_worker.job_name, "Hello!"))
  |> bg_jobs.enqueue(bg)

  process.sleep_forever()
}

This example sets up a default_queue to handle example_worker jobs. When a job with the payload “Hello!” is dispatched, it is stored in the database, picked up by the queue, and logged.

Architecture

TODO: expand this section

Program architecture:

Durability

TODO: update, this is not exactly correct any more.. Jobs are claimed with a timestamp and queue/scheduled job name. Each queue and scheduled job should have a unique name even across machines. Whenever the worker starts up it clears out previously claimed jobs. This way if a worker crashes and the supervisor restarts it there is no abandoned jobs.

Supervisor Integration in bg_jobs

bg_jobs is an OTP-based job scheduling system that integrates with Erlang’s supervision tree. When setting up bg_jobs, you need to pass a static_supervisor.Builder along with a db_adapter. This provides flexibility, allowing you to configure the supervisor’s settings before passing it into bg_jobs. You can also choose to supervise the created supervisor itself.

Configuring the Supervisor for bg_jobs

To set custom parameters for the bg_jobs supervisor, you need to first create the supervisor with the desired strategy and configuration settings.

Example 1: Setting restart_tolerance for the bg_jobs Supervisor You can configure the restart_tolerance setting for the supervisor, which determines how many restarts are tolerated in a given time window before the supervisor itself is terminated.

static_supervisor.new(static_supervisor.OneForOne)   // Create a new supervisor with the OneForOne strategy
|> static_supervisor.restart_tolerance(10, 1)  // Set restart tolerance to 10 restarts in 1 second
|> bg_jobs.new(db_adapter)  // Create a new bg_jobs instance with the specified db_adapter
|> bg_jobs.build()  // Build and start the bg_jobs supervision tree

In this example:

Example 2: Supervising the bg_jobs Supervisor You can also add the bg_jobs supervisor as a child to another supervisor, allowing you to control its lifecycle and supervise it as part of your system.

fn setup_bg_jobs() -> Result(bg_jobs.BgJobs, errors.BgJobError) {
  // Define how the bg_jobs supervisor is set up
  todo
}

static_supervisor.new(sup.OneForOne)   // Create a new supervisor with the OneForOne strategy
|> static_supervisor.add(static_supervisor.supervisor_child("bg_jobs", fn() {  // Add the bg_jobs supervisor as a child
  setup_bg_jobs()  // Set up bg_jobs
    |> result.map(fn(bg) { bg.supervisor })  // Map the result to return the supervisors pid
}))

In this example:

Queue

Each queue that you add to the bg_jobs program spawns an erlang/otp/ actor that manages that queue. It’s responsible for polling the database for jobs it has workers for.

*A worker is added to a queue using the queue.with_worker(Queue, Worker) method. Jobs are added to the database by calling bg_jobs.enqueue(JobRequest)

When a queue actor finds a job it can handle it claims that job by setting the reserved_at and the reserved_by columns in the database to the current datetime (UTC+0) and current processes pid. This ensures no other queue claims that job

It then spawns a new process and executed the jobs worker with the job data from the database in the new process.

The worker should not panic but instead return a result, if the worker panics the queue actor will crash and the supervisor will try to restart it. If the job execution results in an ok a message is sent back to the queue actor and the actor moves the job from the jobs table to the jobs_succeeded table.

However if the job execution results in an error, it’s retried as many times as specified by queue.with_max_retries (default: 3). Should the job not succeed within the max_retries a message is sent back to the queue actor and the actor moves the job from the jobs table to the jobs_failed table with the string provided in the error as the exception.

Multiple queues may have the same workers meaning multiple queues may be able to process the same type of jobs. This can be used to prioritize job execution.

For example if one type of job is extra important you could say all queues can process that job type by specifying that jobs worker on all queues. Or maybe you have a default queue that can handle all jobs and then add another queue for a specific job.

Jobs can have delayed processing either at a specific time or relative time in the future. This can be specified with the bg_jobs.job_with_available_at or job_with_available_in respectively.

Scheduled job

Scheduled jobs work the same way as queues internally with some key differences.

Each scheduled job has 1 and only 1 worker meaning it can only handle one type of job.

A scheduled job is not manually enqueued. Instead on startup and successfull processing of a job the next processing time is calculated and the job is enqueued with the available_at value set to that next processing time.

This also means that if the job execution overlaps with what would be the next processing time the job wont run that time.

For example if a job starts 13:30 and should run every minute the next run time would be 13:31. But if the job execution takes 1min 30sec. the next run will be at 13:32 since it’s only rescheduled after it completes.

Schedules

Schedules for scheduled jobs can be specified in two ways. As an interval or as a cron-like schedule

Scheduled jobs are either executing or waiting to be executed. Since the job’s next run is calculated on startup the first thing it does is wait. However if there already is a scheduled job when the actor starts it will do nothing and wait for that job to become available.

Intervals

Intervals is the simplest for of schedules provided. Intervals can be specified in millieseconds, seconds, minutes, hours , days or weeks. This is done using the corresponding scheduled_job.new_interval_[ time unit]() functions, ex. scheduled_job.new_interval_minutes(20) would create a new interval of 20 minutes. Execution time is not included in the interval. This means, what you’re really setting, is how far in the future the job should be scheduled for.

TODO: diagram

Schedules

Sometimes you need to have more advanced schedules. For example you may want a job run on the first day of the month at 08:30, or on thursdays between the months march and june.

This can be achieved using schedules. Schedules allows you to specify cron-like schedules ranges or specific values.

To achieve this flexibility, schedules are defined by configuring the following time components:

Each component can be set to: Every: Matches all possible values for that component. List: A combination of:

Behavior of Every

Setting a component to Every means it matches all possible values for that component. For example:

Setting hours to Every will trigger the job at any hour, as long as other time units match their criteria.

ScheduleBuilder Functions

The ScheduleBuilder provides a series of methods to configure each time component. Creating a New Schedule

pub fn new_schedule() -> ScheduleBuilder

Creates a new schedule with the default configuration: triggers at the first second of every minute.

Combining Filters

Filters across time units are combined using AND logic. For example:

let schedule = new_schedule()
    .on_second(1)
    .between_seconds(10, 15)
    .on_minute(30)
    .on_hour(8)
    .between_hours(14, 16)
    .on_thursdays()
    .between_months(3, 6);

This schedule will trigger only when:

Summary

The ScheduleBuilder provides a powerful and flexible way to define schedules:

Old docs..

Core Components

BgJobs

TODO: rewrite BgJobs is the main entry point for setting up background job processing. This starts the otp supervision tree with all the queue, scheduled jobs and monitor actors that where specified.

Setup example:

  let bg = 
    static_supervisor.new(static_supervisor.OneForOne)
    |> bg_jobs.new(db_adapter)
    // Event listeners
    |> bg_jobs.with_event_listener(logger_event_listener.listner)
    // Queues
    |> bg_jobs.with_queue(queue.new("default_queue"))
    // Scheduled jobs 
    |> bg_jobs.with_scheduled_job(scheduled_job.new(
      worker: cleanup_db_job.worker(),
      schedule: scheduled_job.interval_minutes(1),
    )) 
    |> bg_jobs.build()

Queues

TODO: rewrite Queues handle the processing of jobs in the background. They poll the database at a specified interval and pick up jobs that are ready for processing. Jobs are picked up by the appropriate queue actor based on the job name, and each queue can be customized for job concurrency, polling interval, retries, and worker configuration.

Queue setup example:

  queue.new("example_queue")
  |> queue.with_worker(...)
  |> queue.build()

Scheduled jobs

TODO: rewrite Scheduled jobs are jobs that run on a predefined schedule, either at a fixed interval (e.g., every 5 minutes) or a more complex schedule (e.g ., cron-like schedules). They self-manage their scheduling, calculating the next run time after each execution. A scheduled job will become available once it reaches the specified time, but when it runs may depend on queue availability.

Scheduled job options:

Intervall scheduled job:

// Interval-based job that runs every minute
scheduled_job.new(
    cleanup_db_job.worker(),
    scheduled_job.interval_minutes(1)
  )

// Cron-like schedule to run at the 10th second and 10th minute of every hour
scheduled_job.new(
    delete_expired_sessions_job.worker(),
    scheduled_job.Schedule(
      scheduled_job.new_schedule()
      |> scheduled_job.on_second(10)
      |> scheduled_job.on_minute(10)
    )
  )

Note: The scheduled time is when the job becomes available to the queue, but the exact run time depends on the scheduled_jobs polling interval and speed

Event Listeners

bg_jobs generates events during job processing, which can be used for logging, monitoring, or custom telemetry. An event listener can be added globally to capture all job events, or it can be attached to specific queues or scheduled jobs for targeted monitoring.

To use the built-in logger_event_listener:

bg_jobs.with_event_listener(logger_event_listener.listener)  // Logs events for all job processing events

Creating a Custom Event Listener

Implement a custom event listener by defining a function with the following signature:

pub type EventListener = fn(Event) -> Nil

If it’s registered, this function will be called whenever an event occurs, receiving an Event object as its parameter. You can
register your custom listener with bg_jobs (global listener), a queue, or a scheduled job as needed.

Example of adding a custom event listener to a specific queue:

queue.with_event_listener(my_custom_event_listener())

Db adapters

bg_jobs includes two built-in database adapters: Postgres (via pog) and SQLite (via sqlight). These adapters allow seamless integration with popular databases, enabling you to use background job processing without additional dependencies. If you are using a different database, refer to these implementations for guidance on building a compatible adapter.

Adding a database adapter to bg_jobs:

let bg = bg_jobs.new(sqlite_db_adapter.new(conn, []))

Further documentation can be found at https://hexdocs.pm/bg_jobs.

Development

gleam run   # Run the project
gleam test  # Run the tests
just watch-test # Run tests in watch mode

TODO

Search Document