README
View Source
EctoMiddleware
Intercept and transform Ecto repository operations using a middleware pipeline pattern.
EctoMiddleware provides a clean, composable way to add cross-cutting concerns to your Ecto operations. Inspired by Absinthe's middleware and Plug, it allows you to transform data before and after database operations, or replace operations entirely.
Features
- Transform data before it reaches the database (normalization, validation, enrichment)
- Transform data after database operations (logging, notifications, caching)
- Replace operations entirely (soft deletes, read-through caching, authorization)
- Halt execution with authorization checks or validation failures
- Telemetry integration for observability
- Backwards compatible with v1.x middleware
Installation
Add ecto_middleware to your dependencies in mix.exs:
def deps do
[
{:ecto_middleware, "~> 2.0"}
]
endQuick Start
1. Enable middleware in your Repo
defmodule MyApp.Repo do
use Ecto.Repo,
otp_app: :my_app,
adapter: Ecto.Adapters.Postgres
use EctoMiddleware.Repo
# Define middleware for specific operations
@impl EctoMiddleware.Repo
def middleware(action, resource) when is_insert(action, resource) do
[NormalizeEmail, HashPassword, AuditLog]
end
def middleware(action, resource) when is_delete(action, resource) do
[SoftDelete, AuditLog]
end
def middleware(_action, _resource) do
[AuditLog]
end
end2. Write middleware
defmodule NormalizeEmail do
use EctoMiddleware
@impl EctoMiddleware
def process_before(changeset, _resolution) do
case Ecto.Changeset.fetch_change(changeset, :email) do
{:ok, email} ->
{:cont, Ecto.Changeset.put_change(changeset, :email, String.downcase(email))}
:error ->
{:cont, changeset}
end
end
end3. Use your Repo as normal
# Middleware runs automatically!
%User{}
|> User.changeset(%{name: "Alice", email: "ALICE@EXAMPLE.COM"})
|> MyApp.Repo.insert()
# => Email is normalized to "alice@example.com" before insertionCore Concepts
The API
Implement process_before/2 to transform data before the database operation:
defmodule AddTimestamp do
use EctoMiddleware
@impl EctoMiddleware
def process_before(changeset, _resolution) do
{:cont, Ecto.Changeset.put_change(changeset, :processed_at, DateTime.utc_now())}
end
endImplement process_after/2 to process data after the database operation:
defmodule NotifyAdmin do
use EctoMiddleware
@impl EctoMiddleware
def process_after({:ok, user} = result, _resolution) do
Task.start(fn -> send_notification(user) end)
{:cont, result}
end
def process_after(result, _resolution) do
{:cont, result}
end
endImplement process/2 to wrap around the entire operation:
defmodule MeasureLatency do
use EctoMiddleware
@impl EctoMiddleware
def process(resource, resolution) do
start_time = System.monotonic_time()
# Yields control to the next middleware (or Repo operation) in the chain
# before resuming here.
{result, _updated_resolution} = yield(resource, resolution)
duration = System.monotonic_time() - start_time
Logger.info("#{resolution.action} took #{duration}ns")
result
end
endImportant: When using process/2, you must call yield/2 to continue the middleware chain.
Halting Execution
Return {:halt, value} to stop the middleware chain and return a value immediately:
defmodule RequireAuth do
use EctoMiddleware
@impl EctoMiddleware
def process(resource, resolution) do
if authorized?(resolution) do
{result, _} = yield(resource, resolution)
result
else
{:halt, {:error, :unauthorized}}
end
end
defp authorized?(resolution) do
get_private(resolution, :current_user) != nil
end
endGuards for Operation Detection
EctoMiddleware provides guards to detect operations, especially useful for insert_or_update:
defmodule ConditionalMiddleware do
use EctoMiddleware
@impl EctoMiddleware
def process_before(changeset, resolution) when is_insert(changeset, resolution) do
{:cont, add_created_metadata(changeset)}
end
def process_before(changeset, resolution) when is_update(changeset, resolution) do
{:cont, add_updated_metadata(changeset)}
end
def process_before(changeset, _resolution) do
{:cont, changeset}
end
endReturn Value Conventions
process_before/2 and process_after/2
Returning bare values from middleware is supported, but to be explicit, return one of:
{:cont, value}- Continue to next middleware{:halt, value}- Stop execution, returnvalue{:cont, value, updated_resolution}- Continue with updated resolution{:halt, value, updated_resolution}- Stop with updated resolution
Bare values are always treated as {:cont, value}.
Telemetry
EctoMiddleware emits telemetry events for observability:
Pipeline Events
[:ecto_middleware, :pipeline, :start]- Pipeline execution starts- Measurements:
%{system_time: integer()} - Metadata:
%{repo: module(), action: atom(), pipeline_id: reference()}
- Measurements:
[:ecto_middleware, :pipeline, :stop]- Pipeline execution completes- Measurements:
%{duration: integer()} - Metadata:
%{repo: module(), action: atom()}
- Measurements:
[:ecto_middleware, :pipeline, :exception]- Pipeline execution fails- Measurements:
%{duration: integer()} - Metadata:
%{repo: module(), action: atom(), kind: atom(), reason: term()}
- Measurements:
Middleware Events
[:ecto_middleware, :middleware, :start]- Individual middleware starts- Measurements:
%{system_time: integer()} - Metadata:
%{middleware: module(), pipeline_id: reference()}
- Measurements:
[:ecto_middleware, :middleware, :stop]- Individual middleware completes- Measurements:
%{duration: integer()} Metadata:
%{middleware: module(), result: :cont | :halt}
- Measurements:
[:ecto_middleware, :middleware, :exception]- Individual middleware fails- Measurements:
%{duration: integer()} - Metadata:
%{middleware: module(), kind: atom(), reason: term()}
- Measurements:
Example handler:
:telemetry.attach(
"log-slow-middleware",
[:ecto_middleware, :middleware, :stop],
fn _event, %{duration: duration}, %{middleware: middleware}, _config ->
if duration > 1_000_000 do # 1ms
Logger.warn("Slow middleware: #{inspect(middleware)} took #{duration}ns")
end
end,
nil
)Migration from V1
V1 middleware continue to work but emit deprecation warnings. See the Migration Guide for details.
Key differences:
- Use
EctoMiddleware.Repoinstead ofEctoMiddlewarein Repo modules - Implement
process_before/2,process_after/2, orprocess/2instead ofmiddleware/2 - No need for
EctoMiddleware.Supermarker - Return
{:cont, value}or{:halt, value}instead of bare values
Silencing Deprecation Warnings
During migration, you can silence warnings temporarily:
# In config/config.exs
config :ecto_middleware, :silence_deprecation_warnings, trueThis should only be used during migration - deprecated APIs will be removed in v3.0.
Example Usage in the Wild
EctoHooks - Adds before_* and after_* callbacks to Ecto schemas, similar to the old Ecto.Model callbacks. Implemented entirely using EctoMiddleware.
See the implementation for real-world middleware examples.
Documentation
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
MIT License. See LICENSE for details.