View Source Ash.DataLayer behaviour (ash v3.4.47)

The behaviour for backing resource actions with persistence layers.

Summary

Functions

The data layer of the resource, or nil if it does not have one

Whether or not the data layer supports a specific feature

Custom functions supported by the data layer of the resource

Whether or not lateral joins should be used for many to many relationships by default

Rolls back the current transaction

Wraps the execution of the function in a transaction with the resource's data_layer

Types

bulk_create_options()

@type bulk_create_options() :: %{
  batch_size: pos_integer(),
  return_records?: boolean(),
  upsert?: boolean(),
  action_select: [atom()],
  upsert_keys: nil | [atom()],
  upsert_condition: Ash.Expr.t() | nil,
  identity: Ash.Resource.Identity.t() | nil,
  select: [atom()],
  upsert_fields:
    nil
    | [atom()]
    | :replace_all
    | {:replace, [atom()]}
    | {:replace_all_except, [atom()]},
  tenant: term()
}

bulk_update_options()

@type bulk_update_options() :: %{
  return_records?: boolean(),
  action_select: [atom()],
  calculations: [{Ash.Query.Calculation.t(), Ash.Expr.t()}],
  select: [atom()],
  tenant: term()
}

data_layer_query()

@type data_layer_query() :: struct()

feature()

@type feature() ::
  :transact
  | :multitenancy
  | {:atomic, :update}
  | {:atomic, :upsert}
  | {:lateral_join, [Ash.Resource.t()]}
  | {:join, Ash.Resource.t()}
  | {:aggregate, Ash.Query.Aggregate.kind()}
  | {:aggregate_relationship, Ash.Resource.Relationships.relationship()}
  | {:query_aggregate, Ash.Query.Aggregate.kind()}
  | :select
  | :expr_error
  | :expression_calculation_sort
  | :aggregate_filter
  | :aggregate_sort
  | :boolean_filter
  | :async_engine
  | :bulk_create
  | :update_query
  | :destroy_query
  | :create
  | :read
  | :update
  | :destroy
  | :limit
  | :offset
  | :transact
  | :filter
  | :composite_type
  | {:lock, lock_type()}
  | {:filter_expr, struct()}
  | {:filter_relationship, Ash.Resource.Relationships.relationship()}
  | :sort
  | {:sort, Ash.Type.t()}
  | :upsert
  | :composite_primary_key

lateral_join_link()

@type lateral_join_link() ::
  {Ash.Resource.t(), atom(), atom(), Ash.Resource.Relationships.relationship()}

lock_type()

@type lock_type() :: :for_update | term()

t()

@type t() :: module()

transaction_reason()

@type transaction_reason() ::
  %{
    :type => :create,
    :metadata => %{resource: Ash.Resource.t(), action: atom()},
    optional(:data_layer_context) => %{}
  }
  | %{
      :type => :update,
      :metadata => %{
        resource: Ash.Resource.t(),
        action: atom(),
        record: Ash.Resource.record(),
        actor: term()
      },
      optional(:data_layer_context) => %{}
    }
  | %{
      :type => :destroy,
      :metadata => %{
        resource: Ash.Resource.t(),
        action: atom(),
        record: Ash.Resource.record(),
        actor: term()
      },
      optional(:data_layer_context) => %{}
    }
  | %{
      :type => :read,
      :metadata => %{
        resource: Ash.Resource.t(),
        query: Ash.Query.t(),
        actor: term()
      },
      optional(:data_layer_context) => %{}
    }
  | %{
      :type => :flow_transaction,
      :metadata => %{
        resource: Ash.Resource.t(),
        input: Ash.ActionInput.t(),
        action: atom(),
        actor: term()
      },
      optional(:data_layer_context) => %{}
    }
  | %{
      :type => :generic,
      :metadata => %{
        step_name: atom() | [term()],
        flow: module(),
        actor: term()
      },
      optional(:data_layer_context) => %{}
    }
  | %{type: :custom, metadata: map()}
  | %{type: atom(), metadata: map()}

Callbacks

add_aggregate(data_layer_query, t, t)

(optional)
@callback add_aggregate(
  data_layer_query(),
  Ash.Query.Aggregate.t(),
  Ash.Resource.t()
) :: {:ok, data_layer_query()} | {:error, term()}

add_aggregates(data_layer_query, list, t)

(optional)
@callback add_aggregates(
  data_layer_query(),
  [Ash.Query.Aggregate.t()],
  Ash.Resource.t()
) :: {:ok, data_layer_query()} | {:error, term()}

add_calculation(data_layer_query, t, expression, t)

(optional)
@callback add_calculation(
  data_layer_query(),
  Ash.Query.Calculation.t(),
  expression :: any(),
  Ash.Resource.t()
) :: {:ok, data_layer_query()} | {:error, term()}

add_calculations(data_layer_query, list, t)

(optional)
@callback add_calculations(
  data_layer_query(),
  [{Ash.Query.Calculation.t(), expression :: any()}],
  Ash.Resource.t()
) :: {:ok, data_layer_query()} | {:error, term()}

bulk_create(t, t, options)

(optional)
@callback bulk_create(
  Ash.Resource.t(),
  Enumerable.t(Ash.Changeset.t()),
  options :: bulk_create_options()
) ::
  :ok
  | {:ok, Enumerable.t(Ash.Resource.record())}
  | {:error, Ash.Error.t()}
  | {:error, :no_rollback, Ash.Error.t()}

calculate(t, list, context)

(optional)
@callback calculate(Ash.Resource.t(), [Ash.Expr.t()], context :: map()) ::
  {:ok, term()} | {:error, term()}

can?(arg1, feature)

@callback can?(Ash.Resource.t() | Spark.Dsl.t(), feature()) :: boolean()

create(t, t)

(optional)
@callback create(Ash.Resource.t(), Ash.Changeset.t()) ::
  {:ok, Ash.Resource.record()}
  | {:error, term()}
  | {:error, :no_rollback, term()}

destroy(t, t)

(optional)
@callback destroy(Ash.Resource.t(), Ash.Changeset.t()) :: :ok | {:error, term()}

destroy_query(data_layer_query, t, t, opts)

(optional)
@callback destroy_query(
  data_layer_query(),
  Ash.Changeset.t(),
  Ash.Resource.t(),
  opts :: bulk_update_options()
) ::
  :ok
  | {:ok, Enumerable.t(Ash.Resource.record())}
  | {:error, Ash.Error.t()}
  | {:error, :no_rollback, Ash.Error.t()}

distinct(data_layer_query, list, resource)

(optional)
@callback distinct(data_layer_query(), [atom()], resource :: Ash.Resource.t()) ::
  {:ok, data_layer_query()} | {:error, term()}

distinct_sort(data_layer_query, t, resource)

(optional)
@callback distinct_sort(data_layer_query(), Ash.Sort.t(), resource :: Ash.Resource.t()) ::
  {:ok, data_layer_query()} | {:error, term()}

filter(data_layer_query, t, resource)

(optional)
@callback filter(data_layer_query(), Ash.Filter.t(), resource :: Ash.Resource.t()) ::
  {:ok, data_layer_query()} | {:error, term()}

functions(t)

(optional)
@callback functions(Ash.Resource.t()) :: [module()]

in_transaction?(t)

(optional)
@callback in_transaction?(Ash.Resource.t()) :: boolean()

limit(data_layer_query, limit, resource)

(optional)
@callback limit(
  data_layer_query(),
  limit :: non_neg_integer(),
  resource :: Ash.Resource.t()
) :: {:ok, data_layer_query()} | {:error, term()}

lock(data_layer_query, lock_type, resource)

(optional)
@callback lock(data_layer_query(), lock_type(), resource :: Ash.Resource.t()) ::
  {:ok, data_layer_query()} | {:error, term()}

offset(data_layer_query, offset, resource)

(optional)
@callback offset(
  data_layer_query(),
  offset :: non_neg_integer(),
  resource :: Ash.Resource.t()
) :: {:ok, data_layer_query()} | {:error, term()}

prefer_lateral_join_for_many_to_many?()

(optional)
@callback prefer_lateral_join_for_many_to_many?() :: boolean()

prefer_transaction?(t)

(optional)
@callback prefer_transaction?(Ash.Resource.t()) :: boolean()

prefer_transaction_for_atomic_updates?(t)

(optional)
@callback prefer_transaction_for_atomic_updates?(Ash.Resource.t()) :: boolean()

resource_to_query(t, t)

@callback resource_to_query(Ash.Resource.t(), Ash.Domain.t()) :: data_layer_query()

return_query(data_layer_query, t)

(optional)
@callback return_query(data_layer_query(), Ash.Resource.t()) ::
  {:ok, data_layer_query()} | {:error, term()}

rollback(t, term)

(optional)
@callback rollback(Ash.Resource.t(), term()) :: no_return()

run_aggregate_query(data_layer_query, list, t)

(optional)
@callback run_aggregate_query(
  data_layer_query(),
  [Ash.Query.Aggregate.t()],
  Ash.Resource.t()
) :: {:ok, map()} | {:error, term()}

run_aggregate_query_with_lateral_join(data_layer_query, list, list, destination_resource, list)

(optional)
@callback run_aggregate_query_with_lateral_join(
  data_layer_query(),
  [Ash.Query.Aggregate.t()],
  [Ash.Resource.record()],
  destination_resource :: Ash.Resource.t(),
  [lateral_join_link()]
) :: {:ok, [Ash.Resource.t()]} | {:error, term()}

run_query(data_layer_query, t)

(optional)
@callback run_query(data_layer_query(), Ash.Resource.t()) ::
  {:ok, [Ash.Resource.record()]}
  | {:error, term()}
  | {:error, :no_rollback, term()}

run_query_with_lateral_join(data_layer_query, list, source_resource, list)

(optional)
@callback run_query_with_lateral_join(
  data_layer_query(),
  [Ash.Resource.record()],
  source_resource :: Ash.Resource.t(),
  [lateral_join_link()]
) :: {:ok, [Ash.Resource.record()]} | {:error, term()}

select(data_layer_query, select, resource)

(optional)
@callback select(
  data_layer_query(),
  select :: [atom()],
  resource :: Ash.Resource.t()
) :: {:ok, data_layer_query()} | {:error, term()}

set_context(t, data_layer_query, map)

(optional)
@callback set_context(Ash.Resource.t(), data_layer_query(), map()) ::
  {:ok, data_layer_query()} | {:error, term()}

set_tenant(t, data_layer_query, term)

(optional)
@callback set_tenant(Ash.Resource.t(), data_layer_query(), term()) ::
  {:ok, data_layer_query()} | {:error, term()}

sort(data_layer_query, t, resource)

(optional)
@callback sort(data_layer_query(), Ash.Sort.t(), resource :: Ash.Resource.t()) ::
  {:ok, data_layer_query()} | {:error, term()}

source(t)

(optional)
@callback source(Ash.Resource.t()) :: String.t()

transaction(t, function, arg3, reason)

(optional)
@callback transaction(
  Ash.Resource.t(),
  (-> term()),
  nil | pos_integer(),
  reason :: transaction_reason()
) :: {:ok, term()} | {:error, term()}

transform_query(t)

(optional)
@callback transform_query(Ash.Query.t()) :: Ash.Query.t()

update(t, t)

(optional)
@callback update(Ash.Resource.t(), Ash.Changeset.t()) ::
  {:ok, Ash.Resource.record()}
  | {:error, term()}
  | {:error, :no_rollback, term()}

update_query(data_layer_query, t, t, opts)

(optional)
@callback update_query(
  data_layer_query(),
  Ash.Changeset.t(),
  Ash.Resource.t(),
  opts :: bulk_update_options()
) ::
  :ok
  | {:ok, Enumerable.t(Ash.Resource.record())}
  | {:error, Ash.Error.t()}
  | {:error, :no_rollback, Ash.Error.t()}

upsert(t, t, list)

(optional)
@callback upsert(Ash.Resource.t(), Ash.Changeset.t(), [atom()]) ::
  {:ok, Ash.Resource.record()}
  | {:error, term()}
  | {:error, :no_rollback, term()}

upsert(t, t, list, arg4)

(optional)
@callback upsert(
  Ash.Resource.t(),
  Ash.Changeset.t(),
  [atom()],
  Ash.Resource.Identity.t() | nil
) ::
  {:ok,
   Ash.Resource.record()
   | {:upsert_skipped, Ash.Query.t(),
      (-> {:ok, Ash.Resource.record()}
          | {:error, term()}
          | {:error, :no_rollback, term()})}}
  | {:error, term()}
  | {:error, :no_rollback, term()}

Functions

add_aggregates(query, aggregates, resource)

@spec add_aggregates(data_layer_query(), [Ash.Query.Aggregate.t()], Ash.Resource.t()) ::
  {:ok, data_layer_query()} | {:error, term()}

add_calculations(query, calculations, resource)

@spec add_calculations(
  data_layer_query(),
  [{Ash.Query.Calculation.t(), expression :: term()}],
  Ash.Resource.t()
) :: {:ok, data_layer_query()} | {:error, term()}

bulk_create(resource, changesets, options)

@spec bulk_create(
  Ash.Resource.t(),
  Enumerable.t(Ash.Changeset.t()),
  options :: bulk_create_options()
) ::
  :ok
  | {:ok, Enumerable.t(Ash.Resource.record())}
  | {:error, Ash.Error.t()}
  | {:error, :no_rollback, Ash.Error.t()}

calculate(resource, exprs, context)

@spec calculate(Ash.Resource.t(), [Ash.Expr.t()], context :: map()) ::
  {:ok, [term()]} | {:error, Ash.Error.t()}

can?(feature, resource)

@spec can?(feature(), Ash.Resource.t() | Spark.Dsl.t()) :: boolean()

create(resource, changeset)

@spec create(Ash.Resource.t(), Ash.Changeset.t()) ::
  {:ok, Ash.Resource.record()}
  | {:error, term()}
  | {:error, :no_rollback, term()}

data_layer(resource)

@spec data_layer(Ash.Resource.t() | Spark.Dsl.t()) :: t() | nil

The data layer of the resource, or nil if it does not have one

data_layer_can?(resource, feature)

@spec data_layer_can?(Ash.Resource.t() | Spark.Dsl.t(), feature()) :: boolean()

Whether or not the data layer supports a specific feature

data_layer_functions(resource)

@spec data_layer_functions(Ash.Resource.t()) :: map()

Custom functions supported by the data layer of the resource

destroy(resource, changeset)

@spec destroy(Ash.Resource.t(), Ash.Changeset.t()) ::
  :ok | {:error, term()} | {:error, :no_rollback, term()}

destroy_query(query, changeset, opts)

@spec destroy_query(
  data_layer_query(),
  Ash.Changeset.t(),
  opts :: bulk_update_options()
) ::
  :ok
  | {:ok, Enumerable.t(Ash.Resource.record())}
  | {:error, Ash.Error.t()}
  | {:error, :no_rollback, Ash.Error.t()}

distinct(query, distinct, resource)

@spec distinct(data_layer_query(), Ash.Sort.t(), Ash.Resource.t()) ::
  {:ok, data_layer_query()} | {:error, term()}

distinct_sort(query, sort, resource)

@spec distinct_sort(data_layer_query(), Ash.Sort.t(), Ash.Resource.t()) ::
  {:ok, data_layer_query()} | {:error, term()}

filter(query, filter, resource)

@spec filter(data_layer_query(), Ash.Filter.t(), Ash.Resource.t()) ::
  {:ok, data_layer_query()} | {:error, term()}

functions(resource)

in_transaction?(resource)

limit(query, limit, resource)

@spec limit(data_layer_query(), limit :: non_neg_integer(), Ash.Resource.t()) ::
  {:ok, data_layer_query()} | {:error, term()}

lock(query, lock_type, resource)

@spec lock(
  data_layer_query(),
  lock_type :: lock_type() | nil,
  resource :: Ash.Resource.t()
) ::
  {:ok, data_layer_query()} | {:error, term()}

offset(query, offset, resource)

@spec offset(data_layer_query(), offset :: non_neg_integer(), Ash.Resource.t()) ::
  {:ok, data_layer_query()} | {:error, term()}

prefer_lateral_join_for_many_to_many?(data_layer)

@spec prefer_lateral_join_for_many_to_many?(t()) :: boolean()

Whether or not lateral joins should be used for many to many relationships by default

prefer_transaction?(resource)

@spec prefer_transaction?(Ash.Resource.t()) :: boolean()

prefer_transaction_for_atomic_updates?(resource)

@spec prefer_transaction_for_atomic_updates?(Ash.Resource.t()) :: boolean()

resource_to_query(resource, domain)

@spec resource_to_query(Ash.Resource.t(), Ash.Domain.t()) :: data_layer_query()

return_query(query, resource)

@spec return_query(data_layer_query(), Ash.Resource.t()) ::
  {:ok, data_layer_query()} | {:error, term()}

rollback(resource, term)

@spec rollback(Ash.Resource.t() | [Ash.Resource.t()], term()) :: no_return()

Rolls back the current transaction

run_aggregate_query(query, aggregates, resource)

@spec run_aggregate_query(
  data_layer_query(),
  [Ash.Query.Aggregate.t()],
  Ash.Resource.t()
) :: {:ok, map()} | {:error, term()}

run_aggregate_query_with_lateral_join(query, aggregates, root_data, destination_resource, path)

run_query(query, central_resource)

@spec run_query(data_layer_query(), central_resource :: Ash.Resource.t()) ::
  {:ok, [Ash.Resource.record()]}
  | {:error, term()}
  | {:error, :no_rollback, term()}

run_query_with_lateral_join(query, root_data, destination_resource, path)

select(query, select, resource)

@spec select(data_layer_query(), select :: [atom()], Ash.Resource.t()) ::
  {:ok, data_layer_query()} | {:error, term()}

set_context(resource, query, map)

@spec set_context(Ash.Resource.t(), data_layer_query(), map()) ::
  {:ok, data_layer_query()} | {:error, term()}

set_tenant(resource, query, term)

@spec set_tenant(Ash.Resource.t(), data_layer_query(), String.t()) ::
  {:ok, data_layer_query()} | {:error, term()}

sort(query, sort, resource)

@spec sort(data_layer_query(), Ash.Sort.t(), Ash.Resource.t()) ::
  {:ok, data_layer_query()} | {:error, term()}

source(resource)

@spec source(Ash.Resource.t()) :: String.t()

transaction(resource_or_resources, func, timeout \\ nil, reason \\ %{type: :custom, metadata: %{}})

@spec transaction(
  Ash.Resource.t() | [Ash.Resource.t()],
  (-> term()),
  nil | pos_integer(),
  reason :: transaction_reason()
) :: term()

Wraps the execution of the function in a transaction with the resource's data_layer

transform_query(query)

update(resource, changeset)

@spec update(Ash.Resource.t(), Ash.Changeset.t()) ::
  {:ok, Ash.Resource.record()}
  | {:error, term()}
  | {:error, :no_rollback, term()}

update_query(query, changeset, opts)

@spec update_query(
  data_layer_query(),
  Ash.Changeset.t(),
  opts :: bulk_update_options()
) ::
  :ok
  | {:ok, Enumerable.t(Ash.Resource.record())}
  | {:error, Ash.Error.t()}
  | {:error, :no_rollback, Ash.Error.t()}

upsert(resource, changeset, keys, identity \\ nil)

@spec upsert(
  Ash.Resource.t(),
  Ash.Changeset.t(),
  [atom()],
  identity :: Ash.Resource.Identity.t() | nil
) ::
  {:ok,
   Ash.Resource.record()
   | {:upsert_skipped, Ash.Query.t(),
      (-> {:ok, Ash.Resource.record()}
          | {:error, term()}
          | {:error, :no_rollback, term()})}}
  | {:error, term()}
  | {:error, :no_rollback, term()}