Skuld.Effects.DB (skuld v0.2.3)

View Source

Unified effect for database writes and transactions.

Provides single operations (working with changesets or ChangeEvents), bulk operations (working with lists of changesets, maps, or ChangeEvents), and transaction management.

Single Operations

Accept either a raw changeset or a ChangeEvent:

user <- DB.insert(changeset)
user <- DB.insert(ChangeEvent.insert(changeset))

user <- DB.update(changeset)
user <- DB.upsert(changeset, conflict_target: :email)
{:ok, _} <- DB.delete(user)

Bulk Operations

Accept lists of changesets, maps, or ChangeEvents. All elements MUST be for the same schema:

{count, users} <- DB.insert_all(User, changesets, returning: true)
{count, nil}   <- DB.update_all(User, changesets)
{count, nil}   <- DB.delete_all(User, structs)

Transactions

result <- DB.transact(comp do
  user <- DB.insert(changeset)
  order <- DB.insert(order_changeset)
  return({user, order})
end)

Handlers

Production (Ecto Repo):

computation
|> DB.Ecto.with_handler(MyApp.Repo)
|> Comp.run!()

Testing (records calls, returns stubs):

{result, calls} =
  computation
  |> DB.Test.with_handler(fn op -> handle_op(op) end)
  |> Comp.run!()

No-op transactions (for tests):

computation
|> DB.Noop.with_handler()
|> Comp.run!()

Error Handling

Operations that fail (e.g., invalid changeset) will throw via the Throw effect. Use Throw.with_handler() to catch errors:

comp do
  user <- DB.insert(changeset)
  return(user)
catch
  {:invalid_changeset, cs} -> return({:error, cs})
end
|> DB.Ecto.with_handler(Repo)
|> Throw.with_handler()
|> Comp.run!()

Rollback on Throw

comp do
  result <- DB.transact(comp do
    _ <- Throw.throw(:something_went_wrong)
    return(:never_reached)
  end)
  return(result)
end
|> DB.Ecto.with_handler(MyApp.Repo)
|> Throw.with_handler()
|> Comp.run!()
# Transaction is rolled back, returns the throw error

Nested Transactions

Nested transact calls create savepoints (if supported by the database):

comp do
  result <- DB.transact(comp do
    _ <- insert_parent()

    inner_result <- DB.transact(comp do
      _ <- insert_child()
      return(:inner_done)
    end)

    return({:outer_done, inner_result})
  end)
  return(result)
end
|> DB.Ecto.with_handler(MyApp.Repo)
|> Comp.run!()

Batched Reads

For batched database reads using FiberPool, see Skuld.Effects.DB.Batch.

Summary

Functions

Delete a record from a struct, changeset, or ChangeEvent.

Delete multiple records.

Insert a record from a changeset or ChangeEvent.

Insert multiple records.

Explicitly roll back the current transaction.

Run a computation inside a database transaction.

Update a record from a changeset or ChangeEvent.

Update multiple records.

Insert or update a record (upsert) from a changeset or ChangeEvent.

Upsert multiple records.

Functions

delete(input, opts \\ [])

Delete a record from a struct, changeset, or ChangeEvent.

Returns {:ok, struct} on success, throws on error.

Example

{:ok, user} <- DB.delete(user)
{:ok, user} <- DB.delete(ChangeEvent.delete(changeset))

delete_all(schema, entries, opts \\ [])

Delete multiple records.

Accepts a list of structs, changesets, or ChangeEvents.

Returns {count, nil | [struct]}.

Example

{count, nil} <- DB.delete_all(User, users)

insert(input, opts \\ [])

Insert a record from a changeset or ChangeEvent.

Returns the inserted struct on success, throws on error.

Example

user <- DB.insert(User.changeset(attrs))
user <- DB.insert(ChangeEvent.insert(changeset))

insert_all(schema, entries, opts \\ [])

Insert multiple records.

Accepts a list of changesets, maps, or ChangeEvents. All must be for the same schema.

Returns {count, nil | [struct]} depending on :returning option.

Example

{count, users} <- DB.insert_all(User, changesets, returning: true)
{count, nil} <- DB.insert_all(User, maps)

rollback(reason)

@spec rollback(term()) :: Skuld.Comp.Types.computation()

Explicitly roll back the current transaction.

Must be called within a transact block. Returns {:rolled_back, reason} after the transaction is rolled back.

Example

comp do
  result <- DB.transact(comp do
    user <- DB.insert(changeset)

    if should_abort?(user) do
      _ <- DB.rollback({:aborted, user.id})
    end

    return(user)
  end)

  return(result)
end

transact(inner_comp)

Run a computation inside a database transaction.

The inner computation will be executed within a transaction scope. On normal completion, the transaction commits. On throw, suspend, or explicit rollback, the transaction rolls back.

Parameters

  • inner_comp - The computation to run inside the transaction

Returns

A computation that returns the result of the inner computation, or {:rolled_back, reason} if explicitly rolled back.

Example

comp do
  result <- DB.transact(comp do
    user <- DB.insert(changeset)
    order <- DB.insert(order_changeset)
    return({user, order})
  end)

  # result is {user, order} if successful
  return(result)
end
|> DB.Ecto.with_handler(MyApp.Repo)
|> Comp.run!()

update(input, opts \\ [])

Update a record from a changeset or ChangeEvent.

Returns the updated struct on success, throws on error.

Example

user <- DB.update(User.changeset(user, attrs))

update_all(schema, entries, opts \\ [])

Update multiple records.

For changesets/ChangeEvents: applies each update individually (not truly bulk). For maps with :query option: uses Repo.update_all with the query.

Returns {count, nil | [struct]}.

Example

# Update each changeset individually
{count, users} <- DB.update_all(User, changesets, returning: true)

# Bulk update via query
{count, nil} <- DB.update_all(User, [], query: query, set: [active: false])

upsert(input, opts \\ [])

Insert or update a record (upsert) from a changeset or ChangeEvent.

Returns the inserted/updated struct on success, throws on error.

Example

user <- DB.upsert(changeset, conflict_target: :email)

upsert_all(schema, entries, opts \\ [])

Upsert multiple records.

Returns {count, nil | [struct]}.

Example

{count, users} <- DB.upsert_all(User, changesets,
  conflict_target: :email,
  returning: true
)