EctoMiddleware

EctoMiddleware

Hex Version Hex Docs CI Status Coverage Status

Intercept and transform Ecto repository operations using a middleware pipeline pattern.

EctoMiddleware provides a clean, composable way to add cross-cutting concerns to your Ecto operations. Inspired by Absinthe's middleware and Plug, it allows you to transform data before and after database operations, or replace operations entirely.

Features

  • Transform data before it reaches the database (normalization, validation, enrichment)
  • Transform data after database operations (logging, notifications, caching)
  • Replace operations entirely (soft deletes, read-through caching, authorization)
  • Halt execution with authorization checks or validation failures
  • Telemetry integration for observability
  • Backwards compatible with v1.x middleware

Installation

Add ecto_middleware to your dependencies in mix.exs:

def deps do
  [
    {:ecto_middleware, "~> 2.0"}
  ]
end

Quick Start

1. Enable middleware in your Repo

defmodule MyApp.Repo do
  use Ecto.Repo,
    otp_app: :my_app,
    adapter: Ecto.Adapters.Postgres

  use EctoMiddleware.Repo

  # Define middleware for specific operations
  @impl EctoMiddleware.Repo
  def middleware(action, resource) when is_insert(action, resource) do
    [NormalizeEmail, HashPassword, AuditLog]
  end

  def middleware(action, resource) when is_delete(action, resource) do
    [SoftDelete, AuditLog]
  end

  def middleware(_action, _resource) do
    [AuditLog]
  end
end

2. Write middleware

defmodule NormalizeEmail do
  use EctoMiddleware

  @impl EctoMiddleware
  def process_before(changeset, _resolution) do
    case Ecto.Changeset.fetch_change(changeset, :email) do
      {:ok, email} ->
        {:cont, Ecto.Changeset.put_change(changeset, :email, String.downcase(email))}

      :error ->
        {:cont, changeset}
    end
  end
end

3. Use your Repo as normal

# Middleware runs automatically!

%User{}
|> User.changeset(%{name: "Alice", email: "ALICE@EXAMPLE.COM"})
|> MyApp.Repo.insert()
# => Email is normalized to "alice@example.com" before insertion

Core Concepts

The API

Implement process_before/2 to transform data before the database operation:

defmodule AddTimestamp do
  use EctoMiddleware

  @impl EctoMiddleware
  def process_before(changeset, _resolution) do
    {:cont, Ecto.Changeset.put_change(changeset, :processed_at, DateTime.utc_now())}
  end
end

Implement process_after/2 to process data after the database operation:

defmodule NotifyAdmin do
  use EctoMiddleware

  @impl EctoMiddleware
  def process_after({:ok, user} = result, _resolution) do
    Task.start(fn -> send_notification(user) end)
    {:cont, result}
  end

  def process_after(result, _resolution) do
    {:cont, result}
  end
end

Implement process/2 to wrap around the entire operation:

defmodule MeasureLatency do
  use EctoMiddleware

  @impl EctoMiddleware
  def process(resource, resolution) do
    start_time = System.monotonic_time()

    # Yields control to the next middleware (or Repo operation) in the chain
    # before resuming here.
    {result, _updated_resolution} = yield(resource, resolution)

    duration = System.monotonic_time() - start_time
    Logger.info("#{resolution.action} took #{duration}ns")

    result
  end
end

Important: When using process/2, you must call yield/2 to continue the middleware chain.

Halting Execution

Return {:halt, value} to stop the middleware chain and return a value immediately:

defmodule RequireAuth do
  use EctoMiddleware

  @impl EctoMiddleware
  def process(resource, resolution) do
    if authorized?(resolution) do
      {result, _} = yield(resource, resolution)
      result
    else
      {:halt, {:error, :unauthorized}}
    end
  end

  defp authorized?(resolution) do
    get_private(resolution, :current_user) != nil
  end
end

Guards for Operation Detection

EctoMiddleware provides guards to detect operations, especially useful for insert_or_update:

defmodule ConditionalMiddleware do
  use EctoMiddleware

  @impl EctoMiddleware
  def process_before(changeset, resolution) when is_insert(changeset, resolution) do
    {:cont, add_created_metadata(changeset)}
  end

  def process_before(changeset, resolution) when is_update(changeset, resolution) do
    {:cont, add_updated_metadata(changeset)}
  end

  def process_before(changeset, _resolution) do
    {:cont, changeset}
  end
end

Return Value Conventions

process_before/2 and process_after/2

Returning bare values from middleware is supported, but to be explicit, return one of:

  • {:cont, value} - Continue to next middleware
  • {:halt, value} - Stop execution, return value
  • {:cont, value, updated_resolution} - Continue with updated resolution
  • {:halt, value, updated_resolution} - Stop with updated resolution

Bare values are always treated as {:cont, value}.

Telemetry

EctoMiddleware emits telemetry events for observability:

Pipeline Events

  • [:ecto_middleware, :pipeline, :start] - Pipeline execution starts

    • Measurements: %{system_time: integer()}
    • Metadata: %{repo: module(), action: atom(), pipeline_id: reference()}
  • [:ecto_middleware, :pipeline, :stop] - Pipeline execution completes

    • Measurements: %{duration: integer()}
    • Metadata: %{repo: module(), action: atom()}
  • [:ecto_middleware, :pipeline, :exception] - Pipeline execution fails

    • Measurements: %{duration: integer()}
    • Metadata: %{repo: module(), action: atom(), kind: atom(), reason: term()}

Middleware Events

  • [:ecto_middleware, :middleware, :start] - Individual middleware starts

    • Measurements: %{system_time: integer()}
    • Metadata: %{middleware: module(), pipeline_id: reference()}
  • [:ecto_middleware, :middleware, :stop] - Individual middleware completes

    • Measurements: %{duration: integer()}
    • Metadata: %{middleware: module(), result: :cont | :halt}

  • [:ecto_middleware, :middleware, :exception] - Individual middleware fails

    • Measurements: %{duration: integer()}
    • Metadata: %{middleware: module(), kind: atom(), reason: term()}

Example handler:

:telemetry.attach(
  "log-slow-middleware",
  [:ecto_middleware, :middleware, :stop],
  fn _event, %{duration: duration}, %{middleware: middleware}, _config ->
    if duration > 1_000_000 do  # 1ms
      Logger.warn("Slow middleware: #{inspect(middleware)} took #{duration}ns")
    end
  end,
  nil
)

Migration from V1

V1 middleware continue to work but emit deprecation warnings. See the Migration Guide for details.

Key differences:

Silencing Deprecation Warnings

During migration, you can silence warnings temporarily:

# In config/config.exs
config :ecto_middleware, :silence_deprecation_warnings, true

This should only be used during migration - deprecated APIs will be removed in v3.0.

Example Usage in the Wild

EctoHooks - Adds before_* and after_* callbacks to Ecto schemas, similar to the old Ecto.Model callbacks. Implemented entirely using EctoMiddleware.

See the implementation for real-world middleware examples.

Documentation

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

MIT License. See LICENSE for details.