View Source EctoTablestore.Repo behaviour (ecto_tablestore v0.15.1)

Defines a repository for Tablestore.

A repository maps to an underlying data store, controlled by Ecto.Adapters.Tablestore adapter.

When used, the repository expects the :otp_app option, and uses Ecto.Adapters.Tablestore by default. The :otp_app should point to an OTP application that has repository configuration. For example, the repository:

defmodule EctoTablestore.MyRepo do
  use EctoTablestore.Repo,
    otp_app: :my_otp_app
end

Configure ex_aliyun_ots as usual:

config :ex_aliyun_ots, MyInstance,
  name: "MY_INSTANCE_NAME",
  endpoint: "MY_INSTANCE_ENDPOINT",
  access_key_id: "MY_OTS_ACCESS_KEY",
  access_key_secret: "MY_OTS_ACCESS_KEY_SECRET"

config :ex_aliyun_ots,
  instances: [MyInstance]

Add the following configuration to associate MyRepo with the previous configuration of ex_aliyun_ots:

config :my_otp_app, EctoTablestore.MyRepo,
  instance: MyInstance

Summary

Query API

Batch get several rows of data from one or more tables, this batch request put multiple get_row in one request from client's perspective.

Fetch a single struct from tablestore where the whole primary key(s) match the given ids.

Get multiple structs by range to the schema, result in :forward direction by default.

Get multiple structs by range from one table, rely on the conjunction of the partition key and other primary key(s).

Similar to get/3, please ensure schema entity has been filled with the whole primary key(s).

See one/2 for more details.

Provide search index features for the following scenarios

Use SQL query data, only support select query.

As a wrapper built on stream_range/4 to create composable and lazy enumerables stream for iteration.

As a wrapper built on ExAliyunOts.stream_range/5 to create composable and lazy enumerables stream for iteration.

As a wrapper built on ExAliyunOts.stream_search/4 to create composable and lazy enumerables stream for iteration.

Schema API

Batch write several rows of data from one or more tables, this batch request puts multiple PutRow/DeleteRow/UpdateRow operations in one request from client's perspective.

Delete a struct using its primary key.

Inserts a struct defined via EctoTablestore.Schema or a changeset.

Updates a changeset using its primary key.

Transaction API

Returns true if the current process is inside a transaction.

Rolls back the current transaction.

Adapt to use the Tablestore's local transaction to perform read and write operations on the data, the feature can perform atomic operations to read and write one or more rows.

Runtime API

Returns the adapter tied to the repository.

Query API

@callback batch_get(batch_gets()) :: {:ok, Keyword.t()} | {:error, term()}

Batch get several rows of data from one or more tables, this batch request put multiple get_row in one request from client's perspective.

After execute each operation in servers, return results independently and independently consumes capacity units.

When input schema_entity, only theirs primary keys are used in query, if need to use theirs attribute columns into condition of query, please use entity_full_match: true option to do that.

Example

When the schema has only 1 primary key, and you use default options:

# get_row style
batch_get([
  {Schema1, [[{"id", 1}], [{"id", 2}]]},
  {Schema2, [[{"id", 3}], [{"id", 4}]]}
])

# get_row style
# if only get one row from one schema could also be:
batch_get([
  {Schema1, [{"id", 1}]},
  {Schema2, [{"id", 3}]}
])

# schema_entity style
batch_get([
  [%Schema1{id: 1}, %Schema1{id: 2}],
  [%Schema2{id: 3}, %Schema2{id: 4}]
])

When the schema has only 1 primary key, and you use custom options:

# get_row style
batch_get([
  {Schema1, [{"id", 1}], [{"id", 2}], columns_to_get: ["field1"]},
  {Schema2, [{"id", 3}], [{"id", 4}], columns_to_get: ["field2"]}
])

# schema_entity style
batch_get([
  {
    [%Schema1{id: 1}, %Schema1{id: 2}],
    columns_to_get: ["field1"]
  },
  {
    [%Schema2{id: 3}, %Schema2{id: 4}],
    columns_to_get: ["field2"]
  }
])

Entity full match:

# only schema_entity style
batch_get([
  {
    [%Schema1{id: 1, field1: 10}, %Schema1{id: 2, field1: 20}],
    entity_full_match: true
  },
  {
    [%Schema2{id: 3, field2: 30}, %Schema2{id: 4, field2: 40}],
    entity_full_match: true
  }
])

When the schema has multiple primary keys:

# get_row style
batch_get([
  {Schema3, [
    [{"id", 1}, {"another_pk", "example1"}],
    [{"id", 2}, {"another_pk", "example2"}]
  ]},
  {Schema4, [
    [{"id", 3}, {"another_pk", "example3"}],
    [{"id", 4}, {"another_pk", "example4"}]
  ]}
])

# schema_entity style
batch_get([
  [
    %Schema3{id: 1, another_pk: "example1"},
    %Schema3{id: 2, another_pk: "example2"}
  ],
  [
    %Schema4{id: 3, another_pk: "example3"},
    %Schema4{id: 4, another_pk: "example4"}
  ]
])
Link to this callback

get(schema, ids, options)

View Source
@callback get(schema(), ids :: list(), options()) :: schema() | {:error, term()} | nil

Fetch a single struct from tablestore where the whole primary key(s) match the given ids.

Options

  • :columns_to_get, string list, return the specified attribute columns, if not specify this option field, will try to return all attribute columns together.

  • :start_column, string, used as a starting column for Wide Column read, the return result contains this as starter.

  • :end_column, string, used as a ending column for Wide Column read, the return result DON NOT contain this column.

  • :filter, used as a filter by condition, support ">", "<", ">=", "<=", "==", "and", "or" and "()" expressions.

    The ignore_if_missing option can be used for the non-existed attribute column, for example:

    An attribute column does not exist meanwhile set it as true, will ignore this match condition in the return result;

    An existed attribute column DOES NOT suit for this use case, the match condition will always affect the return result, if match condition does not satisfy, they won't be return in result.

      filter: filter(({"name", ignore_if_missing: true} == var_name and "age" > 1) or ("class" == "1"))
  • :transaction_id, read under local transaction in a partition key.

Link to this callback

get_range(schema, options)

View Source
@callback get_range(schema(), options()) ::
  {list() | nil, next_token()} | {:error, term()}

Get multiple structs by range to the schema, result in :forward direction by default.

Example

Simply get result in a range request:

Repo.get_range(Order)
Repo.get_range(Order, direction: :backward)

If the Order schema has multiple primary keys, and the :id field is the partition key, we can get records in range under the partition key "1" in this way:

Repo.get_range(%Order{id: "1"})

Options

See get_range/4.

Link to this callback

get_range(schema, start_primary_keys, end_primary_keys, options)

View Source
@callback get_range(schema(), start_primary_keys(), end_primary_keys(), options()) ::
  {list() | nil, next_token()} | {:error, term()}

Get multiple structs by range from one table, rely on the conjunction of the partition key and other primary key(s).

Options

  • :direction, by default it is :forward, set it as :forward to make the order of the query result in ascending by primary key(s), set it as :backward to make the order of the query result in descending by primary key(s).

  • :columns_to_get, string list, return the specified attribute columns, if not specify this field all attribute columns will be return.

  • :start_column, string, used as a starting column for Wide Column read, the return result contains this as starter.

  • :end_column, string, used as a ending column for Wide Column read, the return result DON NOT contain this column.

  • :limit, optional, the maximum number of rows of data to be returned, this value must be greater than 0, whether this option is set or not, there returns a maximum of 5,000 data rows and the total data size never exceeds 4 MB.

  • :transaction_id, read under local transaction in a partition key.

  • :filter, used as a filter by condition, support ">", "<", ">=", "<=", "==", "and", "or" and "()" expressions.

    The ignore_if_missing option can be used for the non-existed attribute column, for example:

    An attribute column does not exist meanwhile set it as true, will ignore this match condition in the return result;

    An existed attribute column DOES NOT suit for this use case, the match condition will always affect the return result, if match condition does not satisfy, they won't be return in result.

      filter: filter(({"name", ignore_if_missing: true} == var_name and "age" > 1) or ("class" == "1"))
@callback one(schema(), options()) :: schema() | {:error, term()} | nil

Similar to get/3, please ensure schema entity has been filled with the whole primary key(s).

Options

  • :entity_full_match, whether to transfer the input attribute column(s) into the :== filtering expressions, by default it is false, when set entity_full_match: true, please notice the following rules:

    • If there exists attribute column(s) provided in entity, these fields will be combined within multiple :== filtering expressions;

    • If there exists attribute column(s) provided and meanwhile set filter option, they will be merged into a composite filter.

Other options please refer get/3.

@callback one!(schema(), options()) :: schema() | {:error, term()} | nil

See one/2 for more details.

Link to this callback

search(schema, index_name, options)

View Source
@callback search(schema(), index_name(), options()) ::
  {:ok, search_result()} | {:error, term()}

Provide search index features for the following scenarios:

  • MatchAllQuery
  • MatchQuery
  • MatchPhraseQuery
  • TermQuery
  • TermsQuery
  • PrefixQuery
  • RangeQuery
  • WildcardQuery
  • BoolQuery
  • NestedQuery
  • ExistsQuery
  • GeoBoundingBoxQuery
  • GeoDistanceQuery
  • GeoPolygonQuery

Please refer ExAliyunOts.Search query section for details, they are similar options for use, for example:

MyRepo.search(MySchema, "index_name",
  search_query: [
    query: exists_query("comment")
  ]
)
Link to this callback

sql_query(schema, sql_query)

View Source
@callback sql_query(schema(), sql_query()) :: {:ok, [schema()]} | {:error, term()}

Use SQL query data, only support select query.

Example

sql_query(Schema1, "SELECT * FROM table LIMIT 20")
@callback stream(schema(), options()) :: Enumerable.t()

As a wrapper built on stream_range/4 to create composable and lazy enumerables stream for iteration.

Options

Please see options of get_range/4 for details.

Link to this callback

stream_range(schema, start_primary_keys, end_primary_keys, options)

View Source
@callback stream_range(schema(), start_primary_keys(), end_primary_keys(), options()) ::
  Enumerable.t()

As a wrapper built on ExAliyunOts.stream_range/5 to create composable and lazy enumerables stream for iteration.

Options

Please see options of get_range/4 for details.

Link to this callback

stream_search(schema, index_name, options)

View Source
@callback stream_search(schema(), index_name(), options()) :: Enumerable.t()

As a wrapper built on ExAliyunOts.stream_search/4 to create composable and lazy enumerables stream for iteration.

Options

Please see options of search/3 for details.

Schema API

Link to this callback

batch_write(batch_writes, options)

View Source
@callback batch_write(batch_writes(), options()) :: {:ok, Keyword.t()} | {:error, term()}

Batch write several rows of data from one or more tables, this batch request puts multiple PutRow/DeleteRow/UpdateRow operations in one request from client's perspective.

After execute each operation in servers, return results independently and independently consumes capacity units.

If use a batch write request include a transaction ID, all rows in that request can only be written to the table that matches the transaction ID.

Options

  • transaction_id, use local transaction.

Example

The options of each :put, :delete, and :update operation are similar as ExAliyunOts.put_row/5, ExAliyunOts.delete_row/4 and ExAliyunOts.update_row/4, but transaction_id option is using in the options of EctoTablestore.Repo.batch_write/2.

By default, require to explicitly set the :condition option in each operation, excepts that if a table defined an auto increment primary key(aka non-partitioned primary key) which is processed in the server side, the server logic MUST use condition: condition(:ignore), in this case, this library internally forces to use condition: condition(:ignore), so we can omit the :condition option in the PutRow operation of a batch write.

If we set entity_full_match: true there will use the whole provided attribute-column field(s) of schema entity into the column_condition of condition filter, and always use row_existence: :EXPECT_EXIST, by default the entity_full_match option is false.

If put a row with an auto increment primary key, meanwhile set entity_full_match: true, the entity_full_match: true option is no effect, this library internally forces to use condition: condition(:ignore).

batch_write([
  delete: [
    {schema_entity_1, condition: condition(:ignore)}
    {schema_entity_2, condition: condition(:expect_exist)}
  ],
  put: [
    {%Schema1{}, condition: condition(:expect_not_exist)},
    {%Schema2{}, condition: condition(:ignore)},
    {%Schema3WithAutoIncrementPK{}, return_type: :pk},
    {changeset_schema_1, condition: condition(:ignore)}
  ],
  update: [
    {changeset_schema_1, return_type: :pk},
    {changeset_schema_2, entity_full_match: true}
  ]
])

Use transaction_id option:

batch_write(
  [
    delete: [...],
    put: [...],
    update: [...]
  ],
  transaction_id: "..."
)
Link to this callback

delete(schema_or_changeset, options)

View Source
@callback delete(schema_or_changeset(), options()) :: {:ok, schema()} | {:error, term()}

Delete a struct using its primary key.

Options

  • :condition, this option is required, whether to add conditional judgment before data delete.

    1. As condition(:expect_exist) means the primary key(s) can match a row to delete, if the row is existed, the delete result will be success, if the row is not existed, the delete result will be fail.

      We also can add some compare expressions for the attribute columns, e.g:

      condition(:expect_exist, "attr1" == value1 and "attr2" > 1)
      condition(:expect_exist, "attr1" != value1)
      condition(:expect_exist, "attr1" > 100 or "attr2" < 1000)
    2. As condition(:ignore) means DO NOT do any condition validation before delete, whether the row exists or not, both of the delete results will be success.

    3. As condition(:expect_not_exist) means expect the primary key(s) are NOT existed before delete, if the row is existed, the delete result will be fail, if the row is not existed, the delete result will be success.

  • :transaction_id, delete under local transaction in a partition key.

  • :stale_error_field - The field where stale errors will be added in the returning changeset. This option can be used to avoid raising Ecto.StaleEntryError.

  • :stale_error_message - The message to add to the configured :stale_error_field when stale errors happen, defaults to "is stale".

Link to this callback

insert(schema_or_changeset, options)

View Source
@callback insert(schema_or_changeset(), options()) :: {:ok, schema()} | {:error, term()}

Inserts a struct defined via EctoTablestore.Schema or a changeset.

If a table defined an auto increment primary key which is processed in the server side, the server logic MUST use condition(:ignore), in this case, this library internally forces to use condition(:ignore) to :condition option, so here we can omit the :condition option when insert a row.

Options

  • :condition, this option is required excepts that the table defined an auto increment primary key, whether to add conditional judgment before date insert.

    1. As condition(:ignore) means DO NOT do any condition validation before insert, whether the row exists or not, both of the insert results will be success, if the schema non-partitioned primary key is auto increment, we can only use condition(:ignore) option.
    2. As condition(:expect_not_exist) means expect the primary key(s) are NOT existed before insert, if the row is existed, the insert result will be fail, if the row is not existed, the insert result will be success.
    3. As condition(:expect_exist) means overwrite the whole existed row to the primary key(s), if the row is existed, the insert result will be success, if the row is not existed, the insert result will be fail.
  • :transaction_id, insert under local transaction in a partition key.

Link to this callback

update(changeset, options)

View Source
@callback update(changeset :: Ecto.Changeset.t(), options()) ::
  {:ok, schema()} | {:error, term()}

Updates a changeset using its primary key.

Options

  • :condition, this option is required, whether to add conditional judgment before data update.

    1. As condition(:expect_exist) means the primary key(s) can match a row to update, if the row is existed, the update result will be success, if the row is not existed, the update result will be fail.

      We also can add some compare expressions for the attribute columns, e.g:

      condition(:expect_exist, "attr1" == value1 and "attr2" > 1)
      condition(:expect_exist, "attr1" != value1)
      condition(:expect_exist, "attr1" > 100 or "attr2" < 1000)
    2. As condition(:ignore) means DO NOT do any condition validation before update, whether the row exists or not, both of the update results will be success.

    3. As condition(:expect_not_exist) means expect the primary key(s) are NOT existed before update, if the row is existed, the update will be fail, if the row is not existed, the update result will be success.

  • :transaction_id, update under local transaction in a partition key.

  • :stale_error_field - The field where stale errors will be added in the returning changeset. This option can be used to avoid raising Ecto.StaleEntryError.

  • :stale_error_message - The message to add to the configured :stale_error_field when stale errors happen, defaults to "is stale".

  • :returning, this option is required when the input changeset with :increment operation, all fields of the atomic increment operation are required to explicitly set into this option in any order, if missed any atomic increment operation related field, there will raise an Ecto.ConstraintError to prompt and terminate this update. If there is no :increment operation, the :returning option is no need to set. If set returning: true, but not really all fields are changed, the unchanged fields will be replaced as nil in the returned schema data.

Transaction API

@callback in_transaction?() :: boolean()

Returns true if the current process is inside a transaction.

Examples

MyRepo.in_transaction?
#=> false

MyRepo.transaction(fn ->
  MyRepo.in_transaction? #=> true
end)
@callback rollback(value :: any()) :: {:error, any()}

Rolls back the current transaction.

The transaction will return the value given as {:error, value}.

Note that calling rollback causes the code in the transaction to stop executing.

Similar to Ecto.Repo.rollback/1.

Link to this callback

transaction(transaction_fun, options)

View Source
@callback transaction(
  transaction_fun(),
  options :: [table: table_name(), partition_key: String.t()]
) :: {:ok, any()} | {:error, any()}

Adapt to use the Tablestore's local transaction to perform read and write operations on the data, the feature can perform atomic operations to read and write one or more rows.

Options

  • :table, this option is required, specifies the table name to enable local transaction.
  • :partition_key, this option is required as a tuple, specifies the partition key and its value to enable local transactions, for example {"partition_key_name", "partition_key_value"}

Example

{:ok, item_updated} = TestRepo.transaction(fn ->
    item = TestRepo.one(item)

    {:ok, updated} =
      item
      |> Changeset.change(array: ["more" | item.array])
      |> TestRepo.update(
        condition: :expect_exist,
        stale_error_message: "no_exist",
        stale_error_field: :condition,
        returning: [:array]
      )
    updated
  end,
  table: "TableName", partition_key: {"p_key", "pk_value1"}
)

Runtime API

@callback __adapter__() :: Ecto.Adapters.Tablestore.t()

Returns the adapter tied to the repository.

@callback start_link(options()) ::
  {:ok, pid()} | {:error, {:already_started, pid()}} | {:error, term()}

Please see Ecto.Repo.start_link/1 for details.

Types

@type batch_gets() :: [batch_get_item()]
@type batch_writes() :: [
  {operation :: :put, items :: [batch_put_item()]}
  | {operation :: :update, items :: [batch_update_item()]}
  | {operation :: :delete, items :: [batch_delete_item()]}
]
@type end_primary_keys() :: list()
@type index_name() :: String.t()
@type next_token() :: binary() | nil
@type options() :: Keyword.t()
@type schema() :: Ecto.Schema.t()
@type schema_or_changeset() :: Ecto.Schema.t() | Ecto.Changeset.t()
@type search_result() :: %{
  is_all_succeeded: boolean(),
  next_token: next_token(),
  schemas: list(),
  total_hits: integer()
}
@type sql_query() :: String.t()
@type start_primary_keys() :: list() | (token :: binary())
@type table_name() :: String.t()
@type transaction_fun() :: (() -> transaction_fun_return())
Link to this type

transaction_fun_abort_return()

View Source
@type transaction_fun_abort_return() ::
  {:error, reason :: any()} | {:abort, reason :: any()}
Link to this type

transaction_fun_commit_return()

View Source
@type transaction_fun_commit_return() ::
  {:ok, return :: any()} | {:commit, return :: any()} | any()
Link to this type

transaction_fun_return()

View Source
@type transaction_fun_return() ::
  transaction_fun_abort_return() | transaction_fun_commit_return()