Oban is a natural fit for MCP's Tasks protocol. Oban job states map directly to MCP task statuses, PostgreSQL persistence works across nodes, and Oban Pro workflows enable chained task execution.
Status Mapping
| MCP Task Status | Oban Job State |
|---|---|
working | executing / available |
input_required | {:snooze, seconds} |
completed | completed |
failed | discarded |
cancelled | cancelled |
Setup
Add to your deps:
{:oban, "~> 2.18"}Migration
defmodule MyApp.Repo.Migrations.CreateMcpTasks do
use Ecto.Migration
def change do
create table(:mcp_tasks, primary_key: false) do
add :task_id, :string, primary_key: true
add :oban_job_id, :integer
add :status, :string, default: "working"
add :method, :string
add :result, :map
add :metadata, :map, default: %{}
timestamps()
end
create index(:mcp_tasks, [:status])
create index(:mcp_tasks, [:oban_job_id])
end
endSchema
defmodule MyApp.McpTask do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:task_id, :string, autogenerate: false}
schema "mcp_tasks" do
field :oban_job_id, :integer
field :status, :string, default: "working"
field :method, :string
field :result, :map
field :metadata, :map, default: %{}
timestamps()
end
def changeset(task, attrs) do
task
|> cast(attrs, [:task_id, :oban_job_id, :status, :method, :result, :metadata])
|> validate_required([:task_id])
|> validate_inclusion(:status, ~w(working input_required completed failed cancelled))
end
endWorker
defmodule MyApp.McpTaskWorker do
use Oban.Worker,
queue: :mcp_tasks,
max_attempts: 3,
unique: [period: 300, fields: [:args], keys: [:task_id]]
@impl Oban.Worker
def perform(%Oban.Job{args: %{"task_id" => task_id} = args}) do
update_status(task_id, "working")
case execute(args) do
{:ok, result} ->
update_task(task_id, %{status: "completed", result: result})
:ok
{:error, reason} ->
update_task(task_id, %{status: "failed", result: %{"error" => inspect(reason)}})
{:error, reason}
{:input_required, schema} ->
update_task(task_id, %{
status: "input_required",
metadata: %{"schema" => schema}
})
{:snooze, 300} # Snooze 5 min, retry after user provides input
end
end
defp execute(%{"handler" => handler} = args) do
module = String.to_existing_atom("Elixir." <> handler)
apply(module, :execute, [args])
end
defp update_status(task_id, status), do: update_task(task_id, %{status: status})
defp update_task(task_id, updates) do
case MyApp.Repo.get(MyApp.McpTask, task_id) do
nil -> :ok
task -> MyApp.Repo.update(MyApp.McpTask.changeset(task, updates))
end
end
endTask Store
defmodule MyApp.ObanTaskStore do
import Ecto.Query
@repo MyApp.Repo
def create_with_job(task_id, worker_args, opts \\ []) do
# Create the task record
attrs = %{task_id: task_id, status: "working", method: worker_args["method"]}
{:ok, _task} = @repo.insert(MyApp.McpTask.changeset(%MyApp.McpTask{}, attrs))
# Enqueue the Oban job
job_args = Map.put(worker_args, "task_id", task_id)
worker = Keyword.get(opts, :worker, MyApp.McpTaskWorker)
{:ok, job} = worker.new(job_args) |> Oban.insert()
# Link task to job
@repo.get!(MyApp.McpTask, task_id)
|> MyApp.McpTask.changeset(%{oban_job_id: job.id})
|> @repo.update()
{:ok, %{"task_id" => task_id, "status" => "working"}}
end
def get(task_id) do
case @repo.get(MyApp.McpTask, task_id) do
nil -> {:error, :not_found}
task -> {:ok, to_map(task)}
end
end
def cancel(task_id) do
case @repo.get(MyApp.McpTask, task_id) do
nil ->
{:error, :not_found}
task ->
if task.oban_job_id, do: Oban.cancel_job(task.oban_job_id)
@repo.update(MyApp.McpTask.changeset(task, %{status: "cancelled"}))
:ok
end
end
def list(opts \\ []) do
query = from(t in MyApp.McpTask, order_by: [desc: t.inserted_at])
query = if s = opts[:status], do: where(query, [t], t.status == ^s), else: query
query = if l = opts[:limit], do: limit(query, ^l), else: query
@repo.all(query) |> Enum.map(&to_map/1)
end
defp to_map(task) do
%{
"task_id" => task.task_id,
"status" => task.status,
"result" => task.result,
"metadata" => task.metadata,
"created_at" => task.inserted_at
}
end
endUsage in MCP Server
defmodule MyApp.MCPServer do
use ConduitMcp.Server
tool "analyze_dataset", "Run long-running analysis" do
scope "analysis:run"
param :dataset_id, :string, "Dataset ID", required: true
handle fn _conn, %{"dataset_id" => dataset_id} ->
task_id = ConduitMcp.Tasks.generate_id()
{:ok, _} = MyApp.ObanTaskStore.create_with_job(task_id, %{
"handler" => "MyApp.DatasetAnalyzer",
"dataset_id" => dataset_id
})
# Return immediately — client polls via tasks/get
{:ok, %{
"content" => [%{"type" => "text", "text" => "Analysis started: #{task_id}"}],
"_meta" => %{"taskId" => task_id, "status" => "working"}
}}
end
end
endWhy Oban?
- Multi-node: All nodes share the PostgreSQL-backed job queue
- Retries: Failed tasks retry automatically (configurable)
- Uniqueness: Prevent duplicate tasks with
uniqueoption - Observability: Oban Web dashboard shows task status in real-time
- Cancellation:
Oban.cancel_job/1maps directly to MCP task cancellation - Snooze:
{:snooze, seconds}handlesinput_required→ wait for user input → resume - Oban Pro Workflows: Chain dependent tasks (e.g., fetch → transform → analyze)