kura_repo_worker (kura v1.19.2)

View Source

CRUD operations, preloading, transactions, and raw queries.

All functions take a repo module as the first argument and execute against its connection pool.

kura_repo_worker:start(MyRepo),
{ok, Users} = kura_repo_worker:all(MyRepo, kura_query:from(my_user)),
{ok, User} = kura_repo_worker:get(MyRepo, my_user, 1),
{ok, Inserted} = kura_repo_worker:insert(MyRepo, Changeset).

Summary

Functions

<<<<<<< HEAD Run an aggregate function on a query. Supported: count, sum, avg, min, max.

Run an aggregate with a default value when the result is nil/null.

Execute a query and return all matching rows.

Build a log event map from query execution data.

Count all rows matching a query.

Return a default logger function that logs queries via logger:info.

Delete the record referenced by the changeset's data.

Bulk delete all rows matching the query, returning the count of deleted rows.

Check if any record matches the query.

Fetch a single record by primary key.

Fetch a single record matching all key-value clauses.

Insert a record from a changeset. Returns {error, Changeset} with errors on failure.

Bulk insert a list of maps, returning the count of inserted rows.

Bulk insert with options. #{returning => true | [atom()]} returns inserted rows.

Execute a kura_multi pipeline inside a transaction.

Execute a query and return exactly one result (adds LIMIT 1).

Preload associations on one or more records (standalone, outside of queries).

Execute raw SQL with parameters.

Re-fetch a record from the database by its primary key.

Restore a soft-deleted record by clearing its deleted_at field.

Soft-delete a record by setting deleted_at to the current timestamp. The record remains in the database but is excluded from normal queries.

Start the connection pool for the given repo module.

Execute a function inside a database transaction.

Update a record from a changeset. No-op if there are no changes.

Bulk update all rows matching the query, returning the count of affected rows.

Functions

aggregate/3

-spec aggregate(module(),
                #kura_query{from :: atom() | module() | undefined,
                            select :: [atom() | term()] | {exprs, [term()]},
                            wheres :: [term()],
                            joins :: [term()],
                            order_bys :: [term()],
                            group_bys :: [atom()],
                            havings :: [term()],
                            limit :: non_neg_integer() | undefined,
                            offset :: non_neg_integer() | undefined,
                            distinct :: boolean() | [atom()],
                            lock :: binary() | undefined,
                            prefix :: binary() | undefined,
                            preloads :: [atom() | {atom(), list()}],
                            ctes :: [{binary(), #kura_query{}}],
                            combinations :: [{union | union_all | intersect | except, #kura_query{}}],
                            include_deleted :: boolean()},
                count | {count | sum | avg | min | max, atom()}) ->
                   {ok, term()} | {error, term()}.

<<<<<<< HEAD Run an aggregate function on a query. Supported: count, sum, avg, min, max.

{ok, 42} = kura_repo_worker:aggregate(MyRepo, Q, count).
{ok, 150.5} = kura_repo_worker:aggregate(MyRepo, Q, {sum, score}).

aggregate(RepoMod, Query, Agg, Default)

-spec aggregate(module(),
                #kura_query{from :: atom() | module() | undefined,
                            select :: [atom() | term()] | {exprs, [term()]},
                            wheres :: [term()],
                            joins :: [term()],
                            order_bys :: [term()],
                            group_bys :: [atom()],
                            havings :: [term()],
                            limit :: non_neg_integer() | undefined,
                            offset :: non_neg_integer() | undefined,
                            distinct :: boolean() | [atom()],
                            lock :: binary() | undefined,
                            prefix :: binary() | undefined,
                            preloads :: [atom() | {atom(), list()}],
                            ctes :: [{binary(), #kura_query{}}],
                            combinations :: [{union | union_all | intersect | except, #kura_query{}}],
                            include_deleted :: boolean()},
                count | {count | sum | avg | min | max, atom()},
                term()) ->
                   {ok, term()} | {error, term()}.

Run an aggregate with a default value when the result is nil/null.

{ok, 0} = kura_repo_worker:aggregate(MyRepo, Q, {sum, score}, 0).

all(RepoMod, Query)

-spec all(module(),
          #kura_query{from :: atom() | module() | undefined,
                      select :: [atom() | term()] | {exprs, [term()]},
                      wheres :: [term()],
                      joins :: [term()],
                      order_bys :: [term()],
                      group_bys :: [atom()],
                      havings :: [term()],
                      limit :: non_neg_integer() | undefined,
                      offset :: non_neg_integer() | undefined,
                      distinct :: boolean() | [atom()],
                      lock :: binary() | undefined,
                      prefix :: binary() | undefined,
                      preloads :: [atom() | {atom(), list()}],
                      ctes :: [{binary(), #kura_query{}}],
                      combinations :: [{union | union_all | intersect | except, #kura_query{}}],
                      include_deleted :: boolean()}) ->
             {ok, [map()]} | {error, term()}.

Execute a query and return all matching rows.

build_log_event(Repo, SQL, Params, Result, DurationUs)

-spec build_log_event(module(), iodata(), [term()], term(), integer()) -> map().

Build a log event map from query execution data.

build_telemetry_metadata(RepoMod, SQL, Params, Result)

count(RepoMod, Query)

-spec count(module(),
            #kura_query{from :: atom() | module() | undefined,
                        select :: [atom() | term()] | {exprs, [term()]},
                        wheres :: [term()],
                        joins :: [term()],
                        order_bys :: [term()],
                        group_bys :: [atom()],
                        havings :: [term()],
                        limit :: non_neg_integer() | undefined,
                        offset :: non_neg_integer() | undefined,
                        distinct :: boolean() | [atom()],
                        lock :: binary() | undefined,
                        prefix :: binary() | undefined,
                        preloads :: [atom() | {atom(), list()}],
                        ctes :: [{binary(), #kura_query{}}],
                        combinations :: [{union | union_all | intersect | except, #kura_query{}}],
                        include_deleted :: boolean()}) ->
               {ok, non_neg_integer()} | {error, term()}.

Count all rows matching a query.

{ok, 42} = kura_repo_worker:count(MyRepo, kura_query:from(my_schema)).

default_logger()

-spec default_logger() -> fun((map()) -> ok).

Return a default logger function that logs queries via logger:info.

delete/2

-spec delete(module(),
             #kura_changeset{valid :: boolean(),
                             schema :: module() | undefined,
                             data :: map(),
                             params :: map(),
                             changes :: map(),
                             errors :: [{atom(), binary()}],
                             types :: #{atom() => kura_types:kura_type()},
                             required :: [atom()],
                             action :: atom() | undefined,
                             constraints ::
                                 [#kura_constraint{type :: unique | foreign_key | check | exclusion,
                                                   constraint :: binary(),
                                                   field :: atom(),
                                                   message :: binary()}],
                             assoc_changes :: #{atom() => #kura_changeset{} | [#kura_changeset{}]},
                             prepare :: [fun((#kura_changeset{}) -> #kura_changeset{})],
                             optimistic_lock :: atom() | undefined}) ->
                {ok, map()} | {error, term()}.

Delete the record referenced by the changeset's data.

delete_all(RepoMod, Query)

-spec delete_all(module(),
                 #kura_query{from :: atom() | module() | undefined,
                             select :: [atom() | term()] | {exprs, [term()]},
                             wheres :: [term()],
                             joins :: [term()],
                             order_bys :: [term()],
                             group_bys :: [atom()],
                             havings :: [term()],
                             limit :: non_neg_integer() | undefined,
                             offset :: non_neg_integer() | undefined,
                             distinct :: boolean() | [atom()],
                             lock :: binary() | undefined,
                             prefix :: binary() | undefined,
                             preloads :: [atom() | {atom(), list()}],
                             ctes :: [{binary(), #kura_query{}}],
                             combinations :: [{union | union_all | intersect | except, #kura_query{}}],
                             include_deleted :: boolean()}) ->
                    {ok, non_neg_integer()} | {error, term()}.

Bulk delete all rows matching the query, returning the count of deleted rows.

exists(RepoMod, Query)

-spec exists(module(),
             #kura_query{from :: atom() | module() | undefined,
                         select :: [atom() | term()] | {exprs, [term()]},
                         wheres :: [term()],
                         joins :: [term()],
                         order_bys :: [term()],
                         group_bys :: [atom()],
                         havings :: [term()],
                         limit :: non_neg_integer() | undefined,
                         offset :: non_neg_integer() | undefined,
                         distinct :: boolean() | [atom()],
                         lock :: binary() | undefined,
                         prefix :: binary() | undefined,
                         preloads :: [atom() | {atom(), list()}],
                         ctes :: [{binary(), #kura_query{}}],
                         combinations :: [{union | union_all | intersect | except, #kura_query{}}],
                         include_deleted :: boolean()}) ->
                {ok, boolean()} | {error, term()}.

Check if any record matches the query.

extract_source(SQL)

get(RepoMod, SchemaMod, Id)

-spec get(module(), module(), term()) -> {ok, map()} | {error, not_found} | {error, term()}.

Fetch a single record by primary key.

get_by(RepoMod, SchemaMod, Clauses)

-spec get_by(module(), module(), [{atom(), term()}]) ->
                {ok, map()} | {error, not_found} | {error, term()}.

Fetch a single record matching all key-value clauses.

insert/2

-spec insert(module(),
             #kura_changeset{valid :: boolean(),
                             schema :: module() | undefined,
                             data :: map(),
                             params :: map(),
                             changes :: map(),
                             errors :: [{atom(), binary()}],
                             types :: #{atom() => kura_types:kura_type()},
                             required :: [atom()],
                             action :: atom() | undefined,
                             constraints ::
                                 [#kura_constraint{type :: unique | foreign_key | check | exclusion,
                                                   constraint :: binary(),
                                                   field :: atom(),
                                                   message :: binary()}],
                             assoc_changes :: #{atom() => #kura_changeset{} | [#kura_changeset{}]},
                             prepare :: [fun((#kura_changeset{}) -> #kura_changeset{})],
                             optimistic_lock :: atom() | undefined}) ->
                {ok, map()} |
                {error,
                 #kura_changeset{valid :: boolean(),
                                 schema :: module() | undefined,
                                 data :: map(),
                                 params :: map(),
                                 changes :: map(),
                                 errors :: [{atom(), binary()}],
                                 types :: #{atom() => kura_types:kura_type()},
                                 required :: [atom()],
                                 action :: atom() | undefined,
                                 constraints ::
                                     [#kura_constraint{type :: unique | foreign_key | check | exclusion,
                                                       constraint :: binary(),
                                                       field :: atom(),
                                                       message :: binary()}],
                                 assoc_changes :: #{atom() => #kura_changeset{} | [#kura_changeset{}]},
                                 prepare :: [fun((#kura_changeset{}) -> #kura_changeset{})],
                                 optimistic_lock :: atom() | undefined}}.

Insert a record from a changeset. Returns {error, Changeset} with errors on failure.

insert/3

-spec insert(module(),
             #kura_changeset{valid :: boolean(),
                             schema :: module() | undefined,
                             data :: map(),
                             params :: map(),
                             changes :: map(),
                             errors :: [{atom(), binary()}],
                             types :: #{atom() => kura_types:kura_type()},
                             required :: [atom()],
                             action :: atom() | undefined,
                             constraints ::
                                 [#kura_constraint{type :: unique | foreign_key | check | exclusion,
                                                   constraint :: binary(),
                                                   field :: atom(),
                                                   message :: binary()}],
                             assoc_changes :: #{atom() => #kura_changeset{} | [#kura_changeset{}]},
                             prepare :: [fun((#kura_changeset{}) -> #kura_changeset{})],
                             optimistic_lock :: atom() | undefined},
             map()) ->
                {ok, map()} |
                {error,
                 #kura_changeset{valid :: boolean(),
                                 schema :: module() | undefined,
                                 data :: map(),
                                 params :: map(),
                                 changes :: map(),
                                 errors :: [{atom(), binary()}],
                                 types :: #{atom() => kura_types:kura_type()},
                                 required :: [atom()],
                                 action :: atom() | undefined,
                                 constraints ::
                                     [#kura_constraint{type :: unique | foreign_key | check | exclusion,
                                                       constraint :: binary(),
                                                       field :: atom(),
                                                       message :: binary()}],
                                 assoc_changes :: #{atom() => #kura_changeset{} | [#kura_changeset{}]},
                                 prepare :: [fun((#kura_changeset{}) -> #kura_changeset{})],
                                 optimistic_lock :: atom() | undefined}}.

insert_all(RepoMod, SchemaMod, Entries)

-spec insert_all(module(), module(), [map()]) -> {ok, non_neg_integer()} | {error, term()}.

Bulk insert a list of maps, returning the count of inserted rows.

insert_all(RepoMod, SchemaMod, Entries, Opts)

-spec insert_all(module(), module(), [map()], map()) ->
                    {ok, non_neg_integer()} | {ok, non_neg_integer(), [map()]} | {error, term()}.

Bulk insert with options. #{returning => true | [atom()]} returns inserted rows.

multi(RepoMod, Multi)

-spec multi(module(), term()) -> {ok, map()} | {error, atom(), term(), map()}.

Execute a kura_multi pipeline inside a transaction.

one(RepoMod, Query)

-spec one(module(),
          #kura_query{from :: atom() | module() | undefined,
                      select :: [atom() | term()] | {exprs, [term()]},
                      wheres :: [term()],
                      joins :: [term()],
                      order_bys :: [term()],
                      group_bys :: [atom()],
                      havings :: [term()],
                      limit :: non_neg_integer() | undefined,
                      offset :: non_neg_integer() | undefined,
                      distinct :: boolean() | [atom()],
                      lock :: binary() | undefined,
                      prefix :: binary() | undefined,
                      preloads :: [atom() | {atom(), list()}],
                      ctes :: [{binary(), #kura_query{}}],
                      combinations :: [{union | union_all | intersect | except, #kura_query{}}],
                      include_deleted :: boolean()}) ->
             {ok, map()} | {error, not_found} | {error, term()}.

Execute a query and return exactly one result (adds LIMIT 1).

pgo_query(RepoMod, SQL, Params)

preload(RepoMod, Schema, Records, Assocs)

-spec preload(module(), module(), map() | [map()], [atom() | {atom(), list()}]) -> map() | [map()].

Preload associations on one or more records (standalone, outside of queries).

query(RepoMod, SQL, Params)

-spec query(module(), iodata(), [term()]) -> {ok, list()} | {error, term()}.

Execute raw SQL with parameters.

reload(RepoMod, SchemaMod, Record)

-spec reload(module(), module(), map()) -> {ok, map()} | {error, term()}.

Re-fetch a record from the database by its primary key.

restore/2

-spec restore(module(),
              #kura_changeset{valid :: boolean(),
                              schema :: module() | undefined,
                              data :: map(),
                              params :: map(),
                              changes :: map(),
                              errors :: [{atom(), binary()}],
                              types :: #{atom() => kura_types:kura_type()},
                              required :: [atom()],
                              action :: atom() | undefined,
                              constraints ::
                                  [#kura_constraint{type :: unique | foreign_key | check | exclusion,
                                                    constraint :: binary(),
                                                    field :: atom(),
                                                    message :: binary()}],
                              assoc_changes :: #{atom() => #kura_changeset{} | [#kura_changeset{}]},
                              prepare :: [fun((#kura_changeset{}) -> #kura_changeset{})],
                              optimistic_lock :: atom() | undefined}) ->
                 {ok, map()} | {error, term()}.

Restore a soft-deleted record by clearing its deleted_at field.

soft_delete/2

-spec soft_delete(module(),
                  #kura_changeset{valid :: boolean(),
                                  schema :: module() | undefined,
                                  data :: map(),
                                  params :: map(),
                                  changes :: map(),
                                  errors :: [{atom(), binary()}],
                                  types :: #{atom() => kura_types:kura_type()},
                                  required :: [atom()],
                                  action :: atom() | undefined,
                                  constraints ::
                                      [#kura_constraint{type :: unique | foreign_key | check | exclusion,
                                                        constraint :: binary(),
                                                        field :: atom(),
                                                        message :: binary()}],
                                  assoc_changes :: #{atom() => #kura_changeset{} | [#kura_changeset{}]},
                                  prepare :: [fun((#kura_changeset{}) -> #kura_changeset{})],
                                  optimistic_lock :: atom() | undefined}) ->
                     {ok, map()} | {error, term()}.

Soft-delete a record by setting deleted_at to the current timestamp. The record remains in the database but is excluded from normal queries.

Returns {error, not_soft_deletable} if the schema has no deleted_at field.

start(RepoMod)

-spec start(module()) -> ok.

Start the connection pool for the given repo module.

transaction(RepoMod, Fun)

-spec transaction(module(), fun(() -> term())) -> term().

Execute a function inside a database transaction.

update/2

-spec update(module(),
             #kura_changeset{valid :: boolean(),
                             schema :: module() | undefined,
                             data :: map(),
                             params :: map(),
                             changes :: map(),
                             errors :: [{atom(), binary()}],
                             types :: #{atom() => kura_types:kura_type()},
                             required :: [atom()],
                             action :: atom() | undefined,
                             constraints ::
                                 [#kura_constraint{type :: unique | foreign_key | check | exclusion,
                                                   constraint :: binary(),
                                                   field :: atom(),
                                                   message :: binary()}],
                             assoc_changes :: #{atom() => #kura_changeset{} | [#kura_changeset{}]},
                             prepare :: [fun((#kura_changeset{}) -> #kura_changeset{})],
                             optimistic_lock :: atom() | undefined}) ->
                {ok, map()} |
                {error,
                 #kura_changeset{valid :: boolean(),
                                 schema :: module() | undefined,
                                 data :: map(),
                                 params :: map(),
                                 changes :: map(),
                                 errors :: [{atom(), binary()}],
                                 types :: #{atom() => kura_types:kura_type()},
                                 required :: [atom()],
                                 action :: atom() | undefined,
                                 constraints ::
                                     [#kura_constraint{type :: unique | foreign_key | check | exclusion,
                                                       constraint :: binary(),
                                                       field :: atom(),
                                                       message :: binary()}],
                                 assoc_changes :: #{atom() => #kura_changeset{} | [#kura_changeset{}]},
                                 prepare :: [fun((#kura_changeset{}) -> #kura_changeset{})],
                                 optimistic_lock :: atom() | undefined}}.

Update a record from a changeset. No-op if there are no changes.

update_all(RepoMod, Query, Updates)

-spec update_all(module(),
                 #kura_query{from :: atom() | module() | undefined,
                             select :: [atom() | term()] | {exprs, [term()]},
                             wheres :: [term()],
                             joins :: [term()],
                             order_bys :: [term()],
                             group_bys :: [atom()],
                             havings :: [term()],
                             limit :: non_neg_integer() | undefined,
                             offset :: non_neg_integer() | undefined,
                             distinct :: boolean() | [atom()],
                             lock :: binary() | undefined,
                             prefix :: binary() | undefined,
                             preloads :: [atom() | {atom(), list()}],
                             ctes :: [{binary(), #kura_query{}}],
                             combinations :: [{union | union_all | intersect | except, #kura_query{}}],
                             include_deleted :: boolean()},
                 map()) ->
                    {ok, non_neg_integer()} | {error, term()}.

Bulk update all rows matching the query, returning the count of affected rows.