ExAliyunOts (ex_aliyun_ots v0.14.2) View Source

The ExAliyunOts module provides a tablestore-based API as a client for working with Alibaba TableStore product servers.

Here are links to official documents in Chinese | English

Configuration

config :ex_aliyun_ots, :my_instance
  name: "MyInstanceName",
  endpoint: "MyInstanceEndpoint",
  access_key_id: "MyAliyunRAMKeyID",
  access_key_secret: "MyAliyunRAMKeySecret"

config :ex_aliyun_ots,
  instances: [:my_instance],
  debug: false,
  enable_tunnel: false
  • debug, optional, specifies whether to enable debug logger, by default it's false, and please DO NOT use debug mode in production.
  • enable_tunnel, optional, specifies whether to enable tunnel functions, there will startup tunnel related Supervisor and Registry when enable it, by default it's false.

Using ExAliyunOts

To use ExAliyunOts, a module that calls use ExAliyunOts has to be defined:

defmodule MyApp.TableStore do
  use ExAliyunOts, instance: :my_instance
end

This automatically defines some macros and functions in the MyApp.TableStore module, here are some examples:

import MyApp.TableStore

# Create table
create_table "table",
  [{"pk1", :integer}, {"pk2", :string}]

# Put row
put_row "table",
  [{"pk1", "id1"}],
  [{"attr1", 10}, {"attr2", "attr2_value"}],
  condition: condition(:expect_not_exist),
  return_type: :pk

# Search index
search "table", "index_name",
  search_query: [
    query: match_query("age", 28),
    sort: [
      field_sort("age", order: :desc)
    ]
  ]

# Local transaction
start_local_transaction "table", {"partition_key", "partition_value"}

ExAliyunOts API

There are two ways to use ExAliyunOts:

  • using macros and functions from your own ExAliyunOts module, like MyApp.TableStore.
  • using macros and functions from the ExAliyunOts module.

All defined functions and macros in ExAliyunOts are available and referable for your own ExAliyunOts module as well, except that the given arity of functions may different, because the instance parameter of each invoke request is NOT needed from your own ExAliyunOts module although the ExAliyunOts module defines it.

Link to this section Summary

Row

Official document in Chinese | English

Similar to condition/1 and support use filter expression (please see filter/1) as well, please refer them for details.

Official document in Chinese | English

Used in batch get operation, please see batch_get/2 for details.

As a wrapper built on get_range/5 to fetch a full matched data set by iterate, if process a large items, recommend to use stream_range/5.

Official document in Chinese | English

As a wrapper built on get_range/5 to create composable and lazy enumerable stream for iteration.

Used in batch write operation, please see batch_write/2 for details.

Used in batch write operation, please see batch_write/2 for details.

Used in batch write operation, please see batch_write/2 for details.

Link to this section Types

Link to this type

exclusive_end_primary_keys()

View Source

Specs

exclusive_end_primary_keys() :: list()
Link to this type

inclusive_start_primary_keys()

View Source

Specs

inclusive_start_primary_keys() :: list()

Specs

index_name() :: String.t()

Specs

instance() :: atom()

Specs

options() :: Keyword.t()

Specs

primary_keys() :: list()

Specs

result() :: {:ok, map()} | {:error, ExAliyunOts.Error.t()}

Specs

row_existence() :: :ignore | :expect_exist | :expect_not_exist

Specs

table_name() :: String.t()

Link to this section Table

Link to this function

compute_split_points_by_size(instance, table_name, splits_size)

View Source

Specs

compute_split_points_by_size(instance(), table_name(), splits_size :: integer()) ::
  result()

Official document in Chinese | English

Link to this function

create_index(instance, table_name, index_name, primary_keys, defined_columns, options \\ [])

View Source

Specs

create_index(
  instance(),
  table_name(),
  index_name(),
  primary_keys :: [String.t()],
  defined_columns :: [String.t()],
  options()
) :: :ok | {:error, ExAliyunOts.Error.t()}

Create global secondary indexes. Official document in Chinese | English

Example

create_index "table_name",
  "table_index_name1"
  ["pk1", "pk2", "col0"],
  ["col1", "col2"]

create_index "table_name",
  "table_index_name2"
  ["col0", "pk1"],
  ["col1", "col2", "col3"],
  include_base_data: false

Options

  • :index_update_mode, the update mode of the index table, optional, currently only support :IUM_ASYNC_INDEX, by default it is :IUM_ASYNC_INDEX;
  • :index_type, the type of the index table, optional, currently only support :IT_GLOBAL_INDEX, by default it is :IT_GLOBAL_INDEX;
  • :include_base_data, specifies whether the index table includes the existing data in the base table, if set it to true means the index includes the existing data, if set it to false means the index excludes the existing data, optional, by default it is true.
Link to this function

create_table(instance, table_name, primary_keys, options \\ [])

View Source

Specs

create_table(instance(), table_name(), primary_keys(), options()) ::
  :ok | {:error, ExAliyunOts.Error.t()}

Official document in Chinese | English

Example

create_table "table_name2",
  [{"key1", :string}, {"key2", :auto_increment}]

create_table "table_name3",
  [{"key1", :string}],
  reserved_throughput_write: 1,
  reserved_throughput_read: 1,
  time_to_live: 100_000,
  max_versions: 3,
  deviation_cell_version_in_sec: 6_400,
  stream_spec: [is_enabled: true, expiration_time: 2]

create_table "table_name",
  [{"key1", :string}],
  defined_columns: [
    {"attr1", :string},
    {"attr2", :integer},
    {"attr3", :boolean},
    {"attr4", :double},
    {"attr5", :binary}
  ]

create_table "table_name",
  [{"key1", :string}],
  index_metas: [
    {"indexname1", ["key1"], ["attr1", "attr2"]},
    {"indexname2", ["key1"], ["attr4"]}
  ]

Options

  • :reserved_throughput_write, optional, the reserved throughput write of table, by default it is 0.
  • :reserved_throughput_read, optional, the reserved throughput read of table, by default it is 0.
  • time_to_live, optional, the data storage time to live in seconds, the minimum settable value is 864_000 seconds (one day), by default it is -1 (for permanent).
  • :max_versions, optional, the version of table, by default it is 1 that specifies there is only one version for columns.
  • :deviation_cell_version_in_sec, optional, maximum version deviation, by default it is 864_000 seconds (one day).
  • :stream_spec, specifies whether enable stream, by default it is not enable stream feature.
    • :is_enabled, enable or not enable stream, use true or false;
    • :expiration_time, the expiration time of stream.
  • :index_metas, optional, the index meta of table, each item of :index_metas is in {String.t(), list(), list()} format, by default it is [].
  • :defined_columns, optional, the indexed attribute column, which is a combination of predefined columns of the base table, each item of :defined_columns is in {String.t(), :integer | :double | :boolean | :string | :binary} format, by default it is [].
Link to this function

delete_index(instance, table_name, index_name)

View Source

Specs

delete_index(instance(), table_name(), index_name()) ::
  :ok | {:error, ExAliyunOts.Error.t()}

Official document in Chinese | English

Example

import MyApp.TableStore

delete_index("table_name", "index_name")
Link to this function

delete_table(instance, table_name)

View Source

Specs

delete_table(instance(), table_name()) :: :ok | {:error, ExAliyunOts.Error.t()}

Official document in Chinese | English

Example

import MyApp.TableStore

delete_table("table_name")
Link to this function

describe_table(instance, table_name)

View Source

Specs

describe_table(instance(), table_name()) :: result()

Official document in Chinese | English

Example

import MyApp.TableStore

describe_table(table_name)

Specs

list_table(instance()) :: result()

Official document in Chinese | English

Example

import MyApp.TableStore

list_table()
Link to this function

update_table(instance, table_name, options \\ [])

View Source

Specs

update_table(instance(), table_name(), options()) :: result()

Official document in Chinese | English

Example

import MyApp.TableStore

update_table "table_name",
  reserved_throughput_write: 10,
  time_to_live: 200_000,
  stream_spec: [is_enabled: false]

Options

Please see options of create_table/4.

Link to this section Row

Link to this function

batch_get(instance, requests)

View Source

Specs

batch_get(instance(), requests :: list()) :: result()

Official document in Chinese | English

Example

import MyApp.TableStore

batch_get [
  get(table_name1, [[{"key1", 1}, {"key2", "1"}]]),
  get(
    table_name2,
    [{"key1", "key1"}],
    columns_to_get: ["name", "age"],
    filter: filter "age" >= 10
  )
]

The batch get operation can be considered as a collection of mulitple get/3 operations.

Link to this function

batch_write(instance, requests, options \\ [])

View Source

Specs

batch_write(instance(), requests :: list(), options()) :: result()

Official document in Chinese | English

Example

import MyApp.TableStore

batch_write [
  {"table1", [
    write_delete([{"key1", 5}, {"key2", "5"}],
      return_type: :pk,
      condition: condition(:expect_exist, "attr1" == 5)),
    write_put([{"key1", 6}, {"key2", "6"}],
      [{"new_put_val1", "val1"}, {"new_put_val2", "val2"}],
      condition: condition(:expect_not_exist),
      return_type: :pk)
  ]},
  {"table2", [
    write_update([{"key1", "new_tab3_id2"}],
      put: [{"new_put1", "u1"}, {"new_put2", 2.5}],
      condition: condition(:expect_not_exist)),
    write_put([{"key1", "new_tab3_id3"}],
      [{"new_put1", "put1"}, {"new_put2", 10}],
      condition: condition(:expect_not_exist))
  ]}
]

The batch write operation can be considered as a collection of multiple write_put/3, write_update/2 and write_delete/2 operations.

Options

  • :transaction_id, optional, batch write operation within local transaction.
  • :is_atomic, optional, defaults to false, whether set this batch write request be with an atomic operation, if this option is true, keep the partition key of each table in the batch write operation is unique, or the corresponding write operation of the table will fail.
Link to this macro

condition(row_existence)

View Source (macro)

Specs

condition(row_existence()) :: map()

Official document in Chinese | English

Example

import MyApp.TableStore

update_row "table", [{"pk", "pk1"}],
  delete_all: ["attr1", "attr2"],
  return_type: :pk,
  condition: condition(:expect_exist)

The available existence options: :expect_exist | :expect_not_exist | :ignore, here are some use cases for your reference:

Use condition(:expect_exist), expect the primary keys to row is existed.

  • for put_row/5, if the primary keys have auto increment column type, meanwhile the target primary keys row is existed, only use condition(:expect_exist) can successfully overwrite the row.
  • for update_row/4, if the primary keys have auto increment column type, meanwhile the target primary keys row is existed, only use condition(:expect_exist) can successfully update the row.
  • for delete_row/4, no matter what primary keys type are, use condition(:expect_exist) can successfully delete the row.

Use condition(:expect_not_exist), expect the primary_keys to row is not existed.

  • for put_row/5, if the primary keys have auto increment type,
    • while the target primary keys row is existed, only use condition(:expect_exist) can successfully put the row;
    • while the target primary keys row is not existed, only use condition(:ignore) can successfully put the row.

Use condition(:ignore), ignore the row existence check

  • for put_row/5, if the primary keys have auto increment column type, meanwhile the target primary keys row is not existed, only use condition(:ignore) can successfully put the row.
  • for update_row/4, if the primary keys have auto increment column type, meanwhile the target primary keys row is not existed, only use condition(:ignore) can successfully update the row.
  • for delete_row/4, no matter what primary keys type are, use condition(:ignore) can successfully delete the row if existed.

The batch_write/3 operation is a collection of put_row / update_row / delete_row operations.

Link to this macro

condition(row_existence, filter_expr)

View Source (macro)

Similar to condition/1 and support use filter expression (please see filter/1) as well, please refer them for details.

Example

import MyApp.TableStore

delete_row "table",
  [{"key", "key1"}, {"key2", "key2"}],
  condition: condition(:expect_exist, "attr_column" == "value2")
Link to this function

delete_row(instance, table_name, primary_keys, options \\ [])

View Source

Specs

delete_row(instance(), table_name(), primary_keys(), options()) :: result()

Official document in Chinese | English

Example

import MyApp.TableStore

delete_row "table1",
  [{"key1", 3}, {"key2", "3"}],
  condition: condition(:expect_exist, "attr2" == "value2")

delete_row "table1",
  [{"key1", 3}, {"key2", "3"}],
  condition: condition(:expect_exist, "attr2" == "value2"),
  transaction_id: "transaction_id"

Options

  • :condition, required, please see condition/1 or condition/2 for details.
  • :transaction_id, optional, write operation within local transaction.
Link to this macro

filter(filter_expr)

View Source (macro)

Official document in Chinese | English

Example

import MyApp.TableStore

get_row table_name1, [{"key", "key1"}],
  columns_to_get: ["name", "level"],
  filter: filter(
    ({"name", ignore_if_missing: true, latest_version_only: true} == var_name and "age" > 1) or
      ("class" == "1")
  )

batch_get [
  get(
    table_name2,
    [{"key", "key1"}],
    filter: filter "age" >= 10
  )
]

Options

  • ignore_if_missing, used when attribute column not existed.
    • if a attribute column is not existed, when set ignore_if_missing: true in filter expression, there will ignore this row data in the returned result;
    • if a attribute column is existed, the returned result won't be affected no matter true or false was set.
  • latest_version_only, used when attribute column has multiple versions.
    • if set latest_version_only: true, there will only check the value of the latest version is matched or not, by default it's set as latest_version_only: true;
    • if set latest_version_only: false, there will check the value of all versions are matched or not.
Link to this function

get(table_name, primary_keys, options \\ [])

View Source

Specs

get(table_name(), primary_keys(), options()) :: map()

Used in batch get operation, please see batch_get/2 for details.

Options

The available options are same as get_row/4.

Link to this function

get_range(instance, table_name, inclusive_start_primary_keys, exclusive_end_primary_keys, options \\ [])

View Source

Official document in Chinese | English

Example

import MyApp.TableStore

get_range "table_name",
  [{"key1", 1}, {"key2", :inf_min}],
  [{"key1", 4}, {"key2", :inf_max}],
  direction: :forward

get_range "table_name",
  [{"key1", 1}, {"key2", :inf_min}],
  [{"key1", 4}, {"key2", :inf_max}],
  time_range: {1525922253224, 1525923253224},
  direction: :forward

get_range "table_name",
  [{"key1", 1}, {"key2", :inf_min}],
  [{"key1", 4}, {"key2", :inf_max}],
  time_range: 1525942123224,
  direction: :forward

Also, there is an alternative stream_range/5 to iteratively get range of rows in stream.

Options

  • :direction, required, the order of fetch data, available options are :forward | :backward, by it is :forward.
    • :forward, this query is performed in the order of primary key in ascending, in this case, input inclusive_start_primary_keys should less than exclusive_end_primary_keys;
    • :backward, this query is performed in the order of primary key in descending, in this case, input inclusive_start_primary_keys should greater than exclusive_end_primary_keys.
  • :columns_to_get, optional, fetch the special fields, by default it returns all fields, pass a field list to specify the expected return fields, e.g. ["field1", "field2"].
  • :start_column, optional, specifies the start column when using for wide-row-read, the returned result contains this :start_column.
  • :end_column, optional, specifies the end column when using for wide-row-read, the returned result does not contain this :end_column.
  • :filter, optional, filter the return results in the server side, please see filter/1 for details.
  • :max_versions, optional, how many versions need to return in results, by default it is 1.
  • :transaction_id, optional, read operation within local transaction.
  • :limit, optional, the maximum number of rows of data to be returned, this value must be greater than 0, whether this option is set or not, there returns a maximum of 5,000 data rows and the total data size never exceeds 4 MB.
  • :time_range, optional, read data by timestamp range, support two ways to use it:
    • time_range: {start_timestamp, end_timestamp}, the timestamp in the range (include start_timestamp but exclude end_timestamp) and then will return in the results.
    • time_range: special_timestamp, exactly match and then will return in the results.
    • :time_range and :max_versions are mutually exclusive, by default use max_versions: 1 and time_range: nil.
Link to this function

get_row(instance, table_name, primary_keys, options \\ [])

View Source

Specs

get_row(instance(), table_name(), primary_keys(), options()) :: result()

Official document in Chinese | English

Example

import MyApp.TableStore

get_row "table1",
  [{"key1", "id1"}, {"key2", "id2"}],
  columns_to_get: ["name", "level"],
  filter: filter(("name[ignore_if_missing: true, latest_version_only: true]" == var_name and "age" > 1) or ("class" == "1"))

get_row "table2",
  [{"key", "1"}],
  start_column: "room",
  filter: pagination(offset: 0, limit: 3)

get_row "table3",
  [{"key", "1"}],
  transaction_id: "transaction_id"

Options

  • :columns_to_get, optional, fetch the special fields, by default it returns all fields, pass a field list to specify the expected return fields e.g. ["field1", "field2"].
  • :start_column, optional, specifies the start column when using for wide-row-read, the returned result contains this :start_column.
  • :end_column, optional, specifies the end column when using for wide-row-read, the returned result does not contain this :end_column.
  • :filter, optional, filter the return results in the server side, please see filter/1 for details.
  • :max_versions, optional, how many versions need to return in results, by default it is 1.
  • :time_range, optional, read data by timestamp range, support two ways to use it:
    • time_range: {start_timestamp, end_timestamp}, the timestamp in the range (include start_timestamp but exclude end_timestamp) and then will return in the results.
    • time_range: special_timestamp, exactly match and then will return in the results.
    • :time_range and :max_versions are mutually exclusive, by default use max_versions: 1 and time_range: nil.
  • :transaction_id, optional, read operation within local transaction.
Link to this function

iterate_all_range(instance, table_name, inclusive_start_primary_keys, exclusive_end_primary_keys, options \\ [])

View Source

Specs

As a wrapper built on get_range/5 to fetch a full matched data set by iterate, if process a large items, recommend to use stream_range/5.

Example

import MyApp.TableStore

iterate_all_range table_name1,
  [{"key1", 1}, {"key2", :inf_min}],
  [{"key1", 4}, {"key2", :inf_max}],
  direction: :forward

Options

Please see options of get_range/5 for details.

Link to this macro

pagination(options)

View Source (macro)

Specs

pagination(options :: Keyword.t()) :: map()

Official document in Chinese | English

Example

import MyApp.TableStore

get_row table_name,
  [{"key", "1"}],
  start_column: "room",
  filter: pagination(offset: 0, limit: 3)

Use pagination/1 for :filter options when get row.

Link to this function

put_row(instance, table_name, primary_keys, attrs, options \\ [])

View Source

Official document in Chinese | English

Example

import MyApp.TableStore

put_row "table1",
  [{"key1", "id1"}],
  [{"name", "name1"}, {"age", 20}],
  condition: condition(:expect_not_exist),
  return_type: :pk

put_row "table2",
  [{"key1", "id1"}],
  [{"name", "name1"}, {"age", 20}],
  condition: condition(:expect_not_exist),
  transaction_id: "transaction_id"
  return_type: :pk

Options

  • :condition, required, please see condition/1 or condition/2 for details.
  • :return_type, optional, whether return the primary keys after put row, available options are :pk | :none, by default it is :none.
  • :transaction_id, optional, write operation within local transaction.
Link to this function

stream_range(instance, table_name, inclusive_start_primary_keys, exclusive_end_primary_keys, options \\ [])

View Source

As a wrapper built on get_range/5 to create composable and lazy enumerable stream for iteration.

Example

import MyApp.TableStore

stream =
  stream_range table_name1,
    [{"key1", 1}, {"key2", :inf_min}],
    [{"key1", 4}, {"key2", :inf_max}],
    direction: :forward

Enum.to_list(stream, fn
  {:ok, %{rows: rows} = response} ->
    # process rows
  {:error, error} ->
    # occur error
end)

Options

Please see options of get_range/5 for details.

Link to this function

update_row(instance, table_name, primary_keys, options \\ [])

View Source

Specs

update_row(instance(), table_name(), primary_keys(), options()) :: result()

Official document in Chinese | English

Example

import MyApp.TableStore

value = "1"
update_row "table1",
  [{"key1", 2}, {"key2", "2"}],
  delete: [{"attr2", nil, 1524464460}],
  delete_all: ["attr1"],
  put: [{"attr3", "put_attr3"}],
  return_type: :pk,
  condition: condition(:expect_exist, "attr2" == value)

update_row "table2",
  [{"key1", 1}],
  put: [{"attr1", "put_attr1"}],
  increment: [{"count", 1}],
  return_type: :after_modify,
  return_columns: ["count"],
  condition: condition(:ignore)

update_row "table3",
  [partition_key],
  put: [{"new_attr1", "a1"}],
  delete_all: ["level", "size"],
  condition: condition(:ignore),
  transaction_id: "transaction_id"

Options

  • :put, optional, require to be valid value, e.g. [{"field1", "value"}, {...}], insert a new column if this field is not existed, or overwrite this field if existed.
  • :delete, optional, delete the special version of a column or columns, please pass the column's version (timestamp) in :delete option, e.g. [{"field1", nil, 1524464460}, ...].
  • :delete_all, optional, delete all versions of a column or columns, e.g. ["field1", "field2", ...].
  • :increment, optional, attribute column(s) base on atomic counters for increment or decrement, require the value of column is integer.
    • for increment, increment: [{"count", 1}];
    • for decrement, increment: [{"count", -1}].
  • :return_type, optional, whether return the primary keys after update row, available options are :pk | :none | :after_modify, by default it is :none.
    • if use atomic counters, must set return_type: :after_modify.
  • :condition, required, please see condition/1 or condition/2 for details.
  • :transaction_id, optional, write operation within local transaction.
Link to this function

write_delete(primary_keys, options \\ [])

View Source

Specs

write_delete(primary_keys(), options()) :: map()

Used in batch write operation, please see batch_write/2 for details.

Options

The available operation same as delete_row/4.

Link to this function

write_put(primary_keys, attrs, options \\ [])

View Source

Specs

write_put(primary_keys(), attrs :: list(), options()) :: map()

Used in batch write operation, please see batch_write/2 for details.

Options

The available options are same as put_row/5.

Link to this function

write_update(primary_keys, options \\ [])

View Source

Specs

write_update(primary_keys(), options()) :: map()

Used in batch write operation, please see batch_write/2 for details.

Options

The available options are same as update_row/4.

Link to this section Local Transaction

Link to this function

abort_transaction(instance, transaction_id)

View Source

Official document in Chinese | English

Example

import MyApp.TableStore

abort_transaction("transaction_id")
Link to this function

commit_transaction(instance, transaction_id)

View Source

Specs

commit_transaction(instance(), transaction_id :: String.t()) :: result()

Official document in Chinese | English

Example

import MyApp.TableStore

commit_transaction("transaction_id")
Link to this function

start_local_transaction(instance, table_name, partition_key)

View Source

Specs

start_local_transaction(instance(), table_name(), partition_key :: tuple()) ::
  result()

Official document in Chinese | English

Example

import MyApp.TableStore

partition_key = {"key", "key1"}
start_local_transaction("table", partition_key)