Using Oban for MCP Tasks

Copy Markdown View Source

Oban is a natural fit for MCP's Tasks protocol. Oban job states map directly to MCP task statuses, PostgreSQL persistence works across nodes, and Oban Pro workflows enable chained task execution.

Status Mapping

MCP Task StatusOban Job State
workingexecuting / available
input_required{:snooze, seconds}
completedcompleted
faileddiscarded
cancelledcancelled

Setup

Add to your deps:

{:oban, "~> 2.18"}

Migration

defmodule MyApp.Repo.Migrations.CreateMcpTasks do
  use Ecto.Migration

  def change do
    create table(:mcp_tasks, primary_key: false) do
      add :task_id, :string, primary_key: true
      add :oban_job_id, :integer
      add :status, :string, default: "working"
      add :method, :string
      add :result, :map
      add :metadata, :map, default: %{}
      timestamps()
    end

    create index(:mcp_tasks, [:status])
    create index(:mcp_tasks, [:oban_job_id])
  end
end

Schema

defmodule MyApp.McpTask do
  use Ecto.Schema
  import Ecto.Changeset

  @primary_key {:task_id, :string, autogenerate: false}
  schema "mcp_tasks" do
    field :oban_job_id, :integer
    field :status, :string, default: "working"
    field :method, :string
    field :result, :map
    field :metadata, :map, default: %{}
    timestamps()
  end

  def changeset(task, attrs) do
    task
    |> cast(attrs, [:task_id, :oban_job_id, :status, :method, :result, :metadata])
    |> validate_required([:task_id])
    |> validate_inclusion(:status, ~w(working input_required completed failed cancelled))
  end
end

Worker

defmodule MyApp.McpTaskWorker do
  use Oban.Worker,
    queue: :mcp_tasks,
    max_attempts: 3,
    unique: [period: 300, fields: [:args], keys: [:task_id]]

  @impl Oban.Worker
  def perform(%Oban.Job{args: %{"task_id" => task_id} = args}) do
    update_status(task_id, "working")

    case execute(args) do
      {:ok, result} ->
        update_task(task_id, %{status: "completed", result: result})
        :ok

      {:error, reason} ->
        update_task(task_id, %{status: "failed", result: %{"error" => inspect(reason)}})
        {:error, reason}

      {:input_required, schema} ->
        update_task(task_id, %{
          status: "input_required",
          metadata: %{"schema" => schema}
        })
        {:snooze, 300}  # Snooze 5 min, retry after user provides input
    end
  end

  defp execute(%{"handler" => handler} = args) do
    module = String.to_existing_atom("Elixir." <> handler)
    apply(module, :execute, [args])
  end

  defp update_status(task_id, status), do: update_task(task_id, %{status: status})

  defp update_task(task_id, updates) do
    case MyApp.Repo.get(MyApp.McpTask, task_id) do
      nil -> :ok
      task -> MyApp.Repo.update(MyApp.McpTask.changeset(task, updates))
    end
  end
end

Task Store

defmodule MyApp.ObanTaskStore do
  import Ecto.Query
  @repo MyApp.Repo

  def create_with_job(task_id, worker_args, opts \\ []) do
    # Create the task record
    attrs = %{task_id: task_id, status: "working", method: worker_args["method"]}
    {:ok, _task} = @repo.insert(MyApp.McpTask.changeset(%MyApp.McpTask{}, attrs))

    # Enqueue the Oban job
    job_args = Map.put(worker_args, "task_id", task_id)
    worker = Keyword.get(opts, :worker, MyApp.McpTaskWorker)
    {:ok, job} = worker.new(job_args) |> Oban.insert()

    # Link task to job
    @repo.get!(MyApp.McpTask, task_id)
    |> MyApp.McpTask.changeset(%{oban_job_id: job.id})
    |> @repo.update()

    {:ok, %{"task_id" => task_id, "status" => "working"}}
  end

  def get(task_id) do
    case @repo.get(MyApp.McpTask, task_id) do
      nil -> {:error, :not_found}
      task -> {:ok, to_map(task)}
    end
  end

  def cancel(task_id) do
    case @repo.get(MyApp.McpTask, task_id) do
      nil ->
        {:error, :not_found}

      task ->
        if task.oban_job_id, do: Oban.cancel_job(task.oban_job_id)
        @repo.update(MyApp.McpTask.changeset(task, %{status: "cancelled"}))
        :ok
    end
  end

  def list(opts \\ []) do
    query = from(t in MyApp.McpTask, order_by: [desc: t.inserted_at])
    query = if s = opts[:status], do: where(query, [t], t.status == ^s), else: query
    query = if l = opts[:limit], do: limit(query, ^l), else: query
    @repo.all(query) |> Enum.map(&to_map/1)
  end

  defp to_map(task) do
    %{
      "task_id" => task.task_id,
      "status" => task.status,
      "result" => task.result,
      "metadata" => task.metadata,
      "created_at" => task.inserted_at
    }
  end
end

Usage in MCP Server

defmodule MyApp.MCPServer do
  use ConduitMcp.Server

  tool "analyze_dataset", "Run long-running analysis" do
    scope "analysis:run"
    param :dataset_id, :string, "Dataset ID", required: true

    handle fn _conn, %{"dataset_id" => dataset_id} ->
      task_id = ConduitMcp.Tasks.generate_id()

      {:ok, _} = MyApp.ObanTaskStore.create_with_job(task_id, %{
        "handler" => "MyApp.DatasetAnalyzer",
        "dataset_id" => dataset_id
      })

      # Return immediately — client polls via tasks/get
      {:ok, %{
        "content" => [%{"type" => "text", "text" => "Analysis started: #{task_id}"}],
        "_meta" => %{"taskId" => task_id, "status" => "working"}
      }}
    end
  end
end

Why Oban?

  • Multi-node: All nodes share the PostgreSQL-backed job queue
  • Retries: Failed tasks retry automatically (configurable)
  • Uniqueness: Prevent duplicate tasks with unique option
  • Observability: Oban Web dashboard shows task status in real-time
  • Cancellation: Oban.cancel_job/1 maps directly to MCP task cancellation
  • Snooze: {:snooze, seconds} handles input_required → wait for user input → resume
  • Oban Pro Workflows: Chain dependent tasks (e.g., fetch → transform → analyze)