bg_jobs - Background job processing for your Gleam app
Docs are WIP right now
Most of it is correct but not complete, some parts are duplicated.
What is bg_jobs
When developing web applications, certain tasks, like processing large files or performing heavy calculations can slow down regular request handling. bg_jobs helps by moving these time-consuming operations to a background queue, keeping your application responsive.
This library is SQL-driven, with adapters for sqlite and postgres included. So you don’t need extra dependencies to get background processing up and running. Just connect to your existing database, set up your background jobs, and your app can handle more demanding tasks without impacting user experience.
Bg_jobs provide the following:
- Queues - including delayed execution
- Scheduled jobs - on an interval or cron like schedule
Time and date
All dates should be passed in UTC+0
Installation
gleam add bg_jobs
Example usage
A complete example setup can be found in the /example directory.
import bg_jobs
import bg_jobs/job
import bg_jobs/queue
import gleam/otp/static_supervisor
fn example_worker() {
jobs.Worker(job_name: "example_job", handler: fn(job: jobs.Job) {
io.print(job.payload)
})
}
pub fn main() {
let conn = todo as "the sqlight connection"
let bg =
static_supervisor.new(static_supervisor.OneForOne)
|> bg_jobs.new(sqlite_db_adapter.new(conn, []))
|> bg_jobs.with_queue(
queue.new("default_queue")
|> queue.with_worker(example_worker())
)
|> bg_jobs.build()
// Dispatch a new job
jobs.new(example_worker.job_name, "Hello!"))
|> bg_jobs.enqueue(bg)
process.sleep_forever()
}
This example sets up a default_queue
to handle example_worker
jobs.
When a job with the payload “Hello!” is dispatched, it is stored in the
database, picked up by the queue, and logged.
Architecture
TODO: expand this section
Program architecture:
- otp supervisor
- otp worker - queue (0 or more)
- Polls the database on specified interval looking for avaliable jobs of the type it can handle
- otp worker - scheduled jobs queue (0 or more)
- Polls the database on specified interval looking for avaliable jobs of the type it can handle
- Enqueus a new job with apropriate avalible at after processing is done
- otp worker - monitor (1 per program)
- Holds Subjects to all workers in bg_jobs. This is used for named lookup
- otp worker - queue (0 or more)
Durability
TODO: update, this is not exactly correct any more..
Jobs are claimed with a timestamp and queue/scheduled job name.
Each queue and scheduled job should have a unique name even across machines.
Whenever the worker starts up it clears out previously claimed jobs.
This way if a worker crashes and the supervisor restarts it there is no
abandoned jobs.
Supervisor Integration in bg_jobs
bg_jobs is an OTP-based job scheduling system that integrates with
Erlang’s supervision tree. When setting up bg_jobs, you need to pass
a static_supervisor.Builder
along with a db_adapter. This provides
flexibility, allowing you to configure the supervisor’s settings
before passing it into bg_jobs. You can also choose to supervise the
created supervisor itself.
Configuring the Supervisor for bg_jobs
To set custom parameters for the bg_jobs supervisor, you need to first create the supervisor with the desired strategy and configuration settings.
Example 1: Setting restart_tolerance for the bg_jobs Supervisor You can configure the restart_tolerance setting for the supervisor, which determines how many restarts are tolerated in a given time window before the supervisor itself is terminated.
static_supervisor.new(static_supervisor.OneForOne) // Create a new supervisor with the OneForOne strategy
|> static_supervisor.restart_tolerance(10, 1) // Set restart tolerance to 10 restarts in 1 second
|> bg_jobs.new(db_adapter) // Create a new bg_jobs instance with the specified db_adapter
|> bg_jobs.build() // Build and start the bg_jobs supervision tree
In this example:
sup.new(sup.OneForOne)
creates a new supervisor with the OneForOne strategy.sup.restart_tolerance(100, 1)
configures the supervisor to tolerate 100 restarts within a 1-second window.bg_jobs.new(db_adapter)
creates a new instance of bg_jobs, passing in the necessary db_adapter.bg_jobs.build()
builds and starts the bg_jobs system under the supervision tree.
Example 2: Supervising the bg_jobs Supervisor You can also add the bg_jobs supervisor as a child to another supervisor, allowing you to control its lifecycle and supervise it as part of your system.
fn setup_bg_jobs() -> Result(bg_jobs.BgJobs, errors.BgJobError) {
// Define how the bg_jobs supervisor is set up
todo
}
static_supervisor.new(sup.OneForOne) // Create a new supervisor with the OneForOne strategy
|> static_supervisor.add(static_supervisor.supervisor_child("bg_jobs", fn() { // Add the bg_jobs supervisor as a child
setup_bg_jobs() // Set up bg_jobs
|> result.map(fn(bg) { bg.supervisor }) // Map the result to return the supervisors pid
}))
In this example:
sup.new(sup.OneForOne)
creates a new supervisor with the OneForOne strategy.sup.add(sup.supervisor_child("bg_jobs", fn() { ... }))
adds the bg_jobs supervisor as a child of the main supervisor.setup_bg_jobs()
contains the logic for setting up the bg_jobs system, which could include creating job schedulers and configuring database adapters.result.map(fn(bg) { bg.supervisor })
ensures that the supervisor for the bg_jobs system is returned and supervised correctly.
Queue
Each queue that you add to the bg_jobs program spawns an erlang/otp/ actor
that manages that queue. It’s responsible for polling the
database for jobs it has workers for.
*A worker is added to a queue using the
queue.with_worker(Queue, Worker)
method. Jobs are added to the database by callingbg_jobs.enqueue(JobRequest)
When a queue actor finds a job it can handle it claims that job by
setting the reserved_at
and the reserved_by
columns in the
database to the current datetime (UTC+0) and current processes
pid. This ensures no other queue claims that job
It then spawns a new process and executed the jobs worker with the job data from the database in the new process.
The worker should not panic but instead return a result, if the worker panics the queue actor will crash and the supervisor will try to restart it. If the job execution results in an ok a message is sent back to the queue actor and the actor moves the job from the jobs table to the jobs_succeeded table.
However if the job execution results in an error, it’s retried as
many times as specified by queue.with_max_retries
(default: 3).
Should the job not succeed within the max_retries a message is sent
back to the queue actor and the actor moves the job from the jobs
table to the jobs_failed table with the string provided in the
error as the exception.
Multiple queues may have the same workers meaning multiple queues may be able to process the same type of jobs. This can be used to prioritize job execution.
For example if one type of job is extra important you could say all queues can process that job type by specifying that jobs worker on all queues. Or maybe you have a default queue that can handle all jobs and then add another queue for a specific job.
Jobs can have delayed processing either at a specific time or
relative time in the future. This can be specified with the
bg_jobs.job_with_available_at
or job_with_available_in
respectively.
Scheduled job
Scheduled jobs work the same way as queues internally with some key differences.
Each scheduled job has 1 and only 1 worker meaning it can only handle one type of job.
A scheduled job is not manually enqueued. Instead on startup and successfull processing of a job the next processing time is calculated and the job is enqueued with the available_at value set to that next processing time.
This also means that if the job execution overlaps with what would be the next processing time the job wont run that time.
For example if a job starts 13:30 and should run every minute the next run time would be 13:31. But if the job execution takes 1min 30sec. the next run will be at 13:32 since it’s only rescheduled after it completes.
Schedules
Schedules for scheduled jobs can be specified in two ways. As an interval or as a cron-like schedule
Scheduled jobs are either executing or waiting to be executed. Since the job’s next run is calculated on startup the first thing it does is wait. However if there already is a scheduled job when the actor starts it will do nothing and wait for that job to become available.
Intervals
Intervals is the simplest for of schedules provided. Intervals can
be specified in millieseconds, seconds, minutes, hours , days or
weeks. This is done using the corresponding
scheduled_job.new_interval_[ time unit]()
functions, ex.
scheduled_job.new_interval_minutes(20)
would create a new interval
of 20 minutes.
Execution time is not included in the interval. This means, what you’re
really setting, is how far in the future the job should be scheduled
for.
TODO: diagram
Schedules
Sometimes you need to have more advanced schedules. For example you may want a job run on the first day of the month at 08:30, or on thursdays between the months march and june.
This can be achieved using schedules. Schedules allows you to specify cron-like schedules ranges or specific values.
To achieve this flexibility, schedules are defined by configuring the following time components:
- Seconds: (Default: 0)
- Minutes: (Default: Every)
- Hours: (Default: Every)
- Day of the Month: (Default: Every)
- Month: (Default: Every)
- Day of the Week: (Default: Every)
Each component can be set to: Every: Matches all possible values for that component. List: A combination of:
- Specific values (e.g., seconds 0, 15, 30).
- Ranges (e.g., hours from 1 to 10). Using List, you can mix specific values and ranges, allowing highly customized schedules.
Behavior of Every
Setting a component to Every means it matches all possible values for that component. For example:
Setting hours to Every will trigger the job at any hour, as long as other time units match their criteria.
ScheduleBuilder Functions
The ScheduleBuilder provides a series of methods to configure each time component. Creating a New Schedule
pub fn new_schedule() -> ScheduleBuilder
Creates a new schedule with the default configuration: triggers at the first second of every minute.
Combining Filters
Filters across time units are combined using AND logic. For example:
let schedule = new_schedule()
.on_second(1)
.between_seconds(10, 15)
.on_minute(30)
.on_hour(8)
.between_hours(14, 16)
.on_thursdays()
.between_months(3, 6);
This schedule will trigger only when:
- The second is 1 or within the range 10–15, and
- The minute is 30, and
- The hour is 8 or between 14:00–16:00, and
- It is Thursday, and
- It is during March through June.
Summary
The ScheduleBuilder provides a powerful and flexible way to define schedules:
- OR logic within each time unit allows combining multiple specific values and ranges.
- AND logic across time units ensures precise scheduling, requiring all units to match their criteria simultaneously. This design makes it easy to express even the most complex scheduling requirements.
Old docs..
Core Components
BgJobs
TODO: rewrite BgJobs is the main entry point for setting up background job processing. This starts the otp supervision tree with all the queue, scheduled jobs and monitor actors that where specified.
Setup example:
let bg =
static_supervisor.new(static_supervisor.OneForOne)
|> bg_jobs.new(db_adapter)
// Event listeners
|> bg_jobs.with_event_listener(logger_event_listener.listner)
// Queues
|> bg_jobs.with_queue(queue.new("default_queue"))
// Scheduled jobs
|> bg_jobs.with_scheduled_job(scheduled_job.new(
worker: cleanup_db_job.worker(),
schedule: scheduled_job.interval_minutes(1),
))
|> bg_jobs.build()
Queues
TODO: rewrite Queues handle the processing of jobs in the background. They poll the database at a specified interval and pick up jobs that are ready for processing. Jobs are picked up by the appropriate queue actor based on the job name, and each queue can be customized for job concurrency, polling interval, retries, and worker configuration.
Queue setup example:
queue.new("example_queue")
|> queue.with_worker(...)
|> queue.build()
Scheduled jobs
TODO: rewrite Scheduled jobs are jobs that run on a predefined schedule, either at a fixed interval (e.g., every 5 minutes) or a more complex schedule (e.g ., cron-like schedules). They self-manage their scheduling, calculating the next run time after each execution. A scheduled job will become available once it reaches the specified time, but when it runs may depend on queue availability.
Scheduled job options:
- Interval Scheduling: Jobs run at a regular interval, for example every minute.
- Cron-like Scheduling: Supports complex scheduling, where you can set specific execution times.
Intervall scheduled job:
// Interval-based job that runs every minute
scheduled_job.new(
cleanup_db_job.worker(),
scheduled_job.interval_minutes(1)
)
// Cron-like schedule to run at the 10th second and 10th minute of every hour
scheduled_job.new(
delete_expired_sessions_job.worker(),
scheduled_job.Schedule(
scheduled_job.new_schedule()
|> scheduled_job.on_second(10)
|> scheduled_job.on_minute(10)
)
)
Note: The scheduled time is when the job becomes available to the queue, but the exact run time depends on the scheduled_jobs polling interval and speed
Event Listeners
bg_jobs generates events during job processing, which can be used for logging, monitoring, or custom telemetry. An event listener can be added globally to capture all job events, or it can be attached to specific queues or scheduled jobs for targeted monitoring.
To use the built-in logger_event_listener:
bg_jobs.with_event_listener(logger_event_listener.listener) // Logs events for all job processing events
Creating a Custom Event Listener
Implement a custom event listener by defining a function with the following signature:
pub type EventListener = fn(Event) -> Nil
If it’s registered, this function will be called whenever an event
occurs, receiving an Event object as its parameter. You can
register your custom listener with bg_jobs (global listener), a queue,
or a scheduled job as needed.
Example of adding a custom event listener to a specific queue:
queue.with_event_listener(my_custom_event_listener())
Db adapters
bg_jobs includes two built-in database adapters: Postgres (via pog) and SQLite (via sqlight). These adapters allow seamless integration with popular databases, enabling you to use background job processing without additional dependencies. If you are using a different database, refer to these implementations for guidance on building a compatible adapter.
Adding a database adapter to bg_jobs:
let bg = bg_jobs.new(sqlite_db_adapter.new(conn, []))
Further documentation can be found at https://hexdocs.pm/bg_jobs.
Development
gleam run # Run the project
gleam test # Run the tests
just watch-test # Run tests in watch mode
TODO
- Documentation
-
Split db adapters to their own packages
- Need to solve testing, since the tests depend on sqlite
- Implement loop in monitor actor to cover the case where down message was not received