View Source ExAliyunOts (ex_aliyun_ots v0.15.1)
The ExAliyunOts
module provides a tablestore-based API as a client for working with Alibaba TableStore product servers.
Here are links to official documents in Chinese | English
Configuration
config :ex_aliyun_ots, :my_instance
name: "MyInstanceName",
endpoint: "MyInstanceEndpoint",
access_key_id: "MyAliyunRAMKeyID",
access_key_secret: "MyAliyunRAMKeySecret"
config :ex_aliyun_ots,
instances: [:my_instance],
debug: false,
enable_tunnel: false
debug
, optional, specifies whether to enable debug logger, by default it's false, and please DO NOT use debug mode in production.enable_tunnel
, optional, specifies whether to enable tunnel functions, there will startup tunnel relatedSupervisor
andRegistry
when enable it, by default it's false.
Using ExAliyunOts
To use ExAliyunOts
, a module that calls use ExAliyunOts
has to be defined:
defmodule MyApp.TableStore do
use ExAliyunOts, instance: :my_instance
end
This automatically defines some macros and functions in the MyApp.TableStore
module, here are some examples:
import MyApp.TableStore
# Create table
create_table "table",
[{"pk1", :integer}, {"pk2", :string}]
# Put row
put_row "table",
[{"pk1", "id1"}],
[{"attr1", 10}, {"attr2", "attr2_value"}],
condition: :expect_not_exist,
return_type: :pk
# Search index
search "table", "index_name",
search_query: [
query: match_query("age", 28),
sort: [
field_sort("age", order: :desc)
]
]
# Local transaction
start_local_transaction "table", {"partition_key", "partition_value"}
ExAliyunOts API
There are two ways to use ExAliyunOts:
- using macros and functions from your own ExAliyunOts module, like
MyApp.TableStore
. - using macros and functions from the
ExAliyunOts
module.
All defined functions and macros in ExAliyunOts
are available and referable for your own ExAliyunOts module as well, except that the given arity of functions may
different, because the instance
parameter of each invoke request is NOT needed from your own ExAliyunOts module although the ExAliyunOts
module defines it.
Summary
Row
Similar to condition/1
and support use filter expression (please see filter/1
) as well, please refer them for details.
Used in batch get operation, please see batch_get/2
for details.
As a wrapper built on get_range/5
to fetch a full matched data set by iterate, if process a large items,
recommend to use stream_range/5
.
As a wrapper built on get_range/5
to create composable and lazy enumerable stream for iteration.
Used in batch write operation, please see batch_write/2
for details.
Used in batch write operation, please see batch_write/2
for details.
Used in batch write operation, please see batch_write/2
for details.
Search
Query current supported maximum number of concurrent tasks to parallel_scan/4
request.
A simple wrapper of stream_parallel_scan/4
to take care OTSSessionExpired
error with retry, make parallel scan
as a stream that applies the given function to the complete result of scan query.
A simple wrapper of stream_parallel_scan/4
to take care OTSSessionExpired
error with retry, make parallel scan
as a stream that applies the given function from module
with the list of arguments args
to the complete result of scan query.
As a wrapper built on stream_search/4
to fetch a full matched data set as a stream, then use Enum.reduce/2
to iteratively
format all data into a list, if process a large items, recommend to use stream_search/4
.
Leverage concurrent tasks to query matched raw data (still be with search function) more quickly, in this use case, this function is improved for speed up scan query, but no guarantee to the order of query results, and does not support the aggregation of scan query.
The one entrance to use search index functions, please see ExAliyunOts.Search
module for details.
Integrate parallel_scan/4
with compute_splits/3
as a complete use, base on the response of compute_splits/3
to create the corresponding
number of concurrency task(s), use Task.async_stream/3
to make parallel scan as a stream which properly process token
in every request of the internal, when use this function need to consider the possibility of the OTSSessionExpired
error in the external.
As a wrapper built on search/4
to create composable and lazy enumerable stream for iteration.
Table
@spec compute_split_points_by_size(instance(), table_name(), splits_size :: integer()) :: result()
create_index(instance, table_name, index_name, primary_keys, defined_columns, options \\ [])
View Source@spec create_index( instance(), table_name(), index_name(), primary_keys :: [String.t()], defined_columns :: [String.t()], options() ) :: :ok | {:error, ExAliyunOts.Error.t()}
Create secondary indexes. Official document in Chinese | English
Example
# global secondary index
create_index "table_name",
"table_index_global",
["col0", "pk2"],
["col1", "col2"]
# local secondary index
create_index "table_name",
"table_index_local",
["pk0", "col1"],
["col2", "col3"],
index_type: :local
# do NOT include existing data in base table
create_index "table_name",
"table_index_global_without_existing_data",
["col0", "pk1"],
["col1", "col2", "col3"],
include_base_data: false
Options
:index_type
, the type of the index, optional. Valid values::global
and:local
. By default it is:global
.- If
:index_type
is not specified or is set to:global
, the global secondary index feature is used. If you use the global secondary index feature, Tablestore automatically synchronizes the columns to be indexed and data in primary key columns from a data table to an index table in asynchronous mode. The synchronization latency is within a few milliseconds. - If
:index_type
is set to:local
, the local secondary index feature is used. If you use the local secondary index feature, Tablestore automatically synchronizes data from the indexed columns and the primary key columns of a data table to the columns of an index table in synchronous mode. After the data is written to the data table, you can query the data from the index table.
- If
:include_base_data
, specifies whether the index table includes the existing data in the base table, if set it totrue
means the index includes the existing data, if set it tofalse
means the index excludes the existing data, optional, by default it istrue
.
@spec create_table(instance(), table_name(), primary_keys(), options()) :: :ok | {:error, ExAliyunOts.Error.t()}
Official document in Chinese | English
Example
create_table "table_name2",
[{"key1", :string}, {"key2", :auto_increment}]
create_table "table_name3",
[{"key1", :string}],
reserved_throughput_write: 1,
reserved_throughput_read: 1,
time_to_live: 100_000,
max_versions: 3,
deviation_cell_version_in_sec: 6_400,
stream_spec: [is_enabled: true, expiration_time: 2]
create_table "table_name",
[{"key1", :string}],
defined_columns: [
{"attr1", :string},
{"attr2", :integer},
{"attr3", :boolean},
{"attr4", :double},
{"attr5", :binary}
]
create_table "table_name",
[{"key1", :string}, {"key2", :string}],
defined_columns: [
{"attr1", :string},
{"attr2", :integer},
{"attr3", :boolean},
{"attr4", :double}
]
index_metas: [
{"index_global1", ["attr2", "key2"], ["attr1", "attr3"]},
{"index_global2", ["attr3", "key1"], ["attr1", "attr2"], index_type: :global},
{"index_local", ["key1", "attr1"], ["attr4"], index_type: :local}
]
Options
:reserved_throughput_write
, optional, the reserved throughput write of table, by default it is 0.:reserved_throughput_read
, optional, the reserved throughput read of table, by default it is 0.time_to_live
, optional, the data storage time to live in seconds, the minimum settable value is 864_000 seconds (one day), by default it is -1 (for permanent).:max_versions
, optional, the version of table, by default it is 1 that specifies there is only one version for columns.:deviation_cell_version_in_sec
, optional, maximum version deviation, by default it is 864_000 seconds (one day).:stream_spec
, specifies whether enable stream, by default it is not enable stream feature.:is_enabled
, enable or not enable stream, usetrue
orfalse
;:expiration_time
, the expiration time of stream.
:index_metas
, optional, the index meta of table, each item of:index_metas
is in {String.t(), list(), list()} | {String.t(), list(), list(), Keyword.t()} format, by default it is [].:defined_columns
, optional, the indexed attribute column, which is a combination of predefined columns of the base table, each item of:defined_columns
is in {String.t(), :integer | :double | :boolean | :string | :binary} format, by default it is [].enable_local_txn
, optional, specifies whether to enable the local transaction feature. The value of this parameter is of the:boolean
type. Default value: false. If you want to enable the local transaction feature when you create a data table, set this parameter to true.
@spec delete_index(instance(), table_name(), index_name()) :: :ok | {:error, ExAliyunOts.Error.t()}
Official document in Chinese | English
Example
import MyApp.TableStore
delete_index("table_name", "index_name")
@spec delete_table(instance(), table_name()) :: :ok | {:error, ExAliyunOts.Error.t()}
Official document in Chinese | English
Example
import MyApp.TableStore
delete_table("table_name")
@spec describe_table(instance(), table_name()) :: result()
Official document in Chinese | English
Example
import MyApp.TableStore
describe_table(table_name)
Official document in Chinese | English
Example
import MyApp.TableStore
list_table()
@spec update_table(instance(), table_name(), options()) :: result()
Official document in Chinese | English
Example
import MyApp.TableStore
update_table "table_name",
reserved_throughput_write: 10,
time_to_live: 200_000,
stream_spec: [is_enabled: false]
Options
Please see options of create_table/4
.
Row
Official document in Chinese | English
Example
import MyApp.TableStore
batch_get [
get(table_name1, [[{"key1", 1}, {"key2", "1"}]]),
get(
table_name2,
[{"key1", "key1"}],
columns_to_get: ["name", "age"],
filter: filter "age" >= 10
)
]
The batch get operation can be considered as a collection of mulitple get/3
operations.
Official document in Chinese | English
Example
import MyApp.TableStore
batch_write [
{"table1", [
write_delete([{"key1", 5}, {"key2", "5"}],
return_type: :pk,
condition: condition(:expect_exist, "attr1" == 5)),
write_put([{"key1", 6}, {"key2", "6"}],
[{"new_put_val1", "val1"}, {"new_put_val2", "val2"}],
condition: condition(:expect_not_exist),
return_type: :pk)
]},
{"table2", [
write_update([{"key1", "new_tab3_id2"}],
put: [{"new_put1", "u1"}, {"new_put2", 2.5}],
condition: :expect_not_exist),
write_put([{"key1", "new_tab3_id3"}],
[{"new_put1", "put1"}, {"new_put2", 10}],
condition: :expect_not_exist)
]}
]
The batch write operation can be considered as a collection of multiple write_put/3
, write_update/2
and write_delete/2
operations.
Options
:transaction_id
, optional, batch write operation within local transaction.:is_atomic
, optional, defaults to false, whether set this batch write request be with an atomic operation, if this option istrue
, keep the partition key of each table in the batch write operation is unique, or the corresponding write operation of the table will fail.
@spec condition(row_existence()) :: map()
Official document in Chinese | English
Example
import MyApp.TableStore
update_row "table", [{"pk", "pk1"}],
delete_all: ["attr1", "attr2"],
return_type: :pk,
condition: condition(:expect_exist)
The available existence
options: :expect_exist
| :expect_not_exist
| :ignore
, here are some use cases for your reference:
Use condition(:expect_exist)
, expect the primary keys to row is existed.
- for
put_row/5
, if the primary keys have auto increment column type, meanwhile the target primary keys row is existed, only usecondition(:expect_exist)
can successfully overwrite the row. - for
update_row/4
, if the primary keys have auto increment column type, meanwhile the target primary keys row is existed, only usecondition(:expect_exist)
can successfully update the row. - for
delete_row/4
, no matter what primary keys type are, usecondition(:expect_exist)
can successfully delete the row.
Use condition(:expect_not_exist)
, expect the primary_keys to row is not existed.
- for
put_row/5
, if the primary keys have auto increment type,- while the target primary keys row is existed, only use
condition(:expect_exist)
can successfully put the row; - while the target primary keys row is not existed, only use
condition(:ignore)
can successfully put the row.
- while the target primary keys row is existed, only use
Use condition(:ignore)
, ignore the row existence check
- for
put_row/5
, if the primary keys have auto increment column type, meanwhile the target primary keys row is not existed, only usecondition(:ignore)
can successfully put the row. - for
update_row/4
, if the primary keys have auto increment column type, meanwhile the target primary keys row is not existed, only usecondition(:ignore)
can successfully update the row. - for
delete_row/4
, no matter what primary keys type are, usecondition(:ignore)
can successfully delete the row if existed.
The batch_write/3
operation is a collection of put_row / update_row / delete_row operations.
Similar to condition/1
and support use filter expression (please see filter/1
) as well, please refer them for details.
Example
import MyApp.TableStore
delete_row "table",
[{"key", "key1"}, {"key2", "key2"}],
condition: condition(:expect_exist, "attr_column" == "value2")
@spec delete_row(instance(), table_name(), primary_keys(), options()) :: result()
Official document in Chinese | English
Example
import MyApp.TableStore
delete_row "table1",
[{"key1", 3}, {"key2", "3"}],
condition: condition(:expect_exist, "attr2" == "value2")
delete_row "table1",
[{"key1", 3}, {"key2", "3"}],
condition: condition(:expect_exist, "attr2" == "value2"),
transaction_id: "transaction_id"
Options
:condition
, required, please seecondition/1
orcondition/2
for details.:transaction_id
, optional, write operation within local transaction.
Official document in Chinese | English
Example
import MyApp.TableStore
get_row table_name1, [{"key", "key1"}],
columns_to_get: ["name", "level"],
filter: filter(
({"name", ignore_if_missing: true, latest_version_only: true} == var_name and "age" > 1) or
("class" == "1")
)
batch_get [
get(
table_name2,
[{"key", "key1"}],
filter: filter "age" >= 10
)
]
put_row(table_name1, [{"key", "key1"}], [{"type", "t:5"}])
# Use `~r/\d+/` regex expression to fetch the matched part (in this case it is "5") from
# the attribute column field, and then cast it into an integer for the "==" comparator.
#
get_row table_name1, [{"key", "key1"}],
filter: filter(
{"type", value_trans_rule: {~r/\d+/, :integer}} == 5
)
Options
ignore_if_missing
, used when attribute column not existed.- if a attribute column is not existed, when set
ignore_if_missing: true
in filter expression, there will ignore this row data in the returned result; - if a attribute column is existed, the returned result won't be affected no matter true or false was set.
- if a attribute column is not existed, when set
latest_version_only
, used when attribute column has multiple versions.- if set
latest_version_only: true
, there will only check the value of the latest version is matched or not, by default it's set aslatest_version_only: true
; - if set
latest_version_only: false
, there will check the value of all versions are matched or not.
- if set
value_trans_rule
, optional, a two-element tuple contains aRegex
expression and one of [:integer, :double, :string] atom as a cast type, the regex expression matched part will be cast into the corresponding type and then use it into the current condition comparator.
@spec get(table_name(), primary_keys(), options()) :: map()
Used in batch get operation, please see batch_get/2
for details.
Options
The available options are same as get_row/4
.
get_range(instance, table_name, inclusive_start_primary_keys, exclusive_end_primary_keys, options \\ [])
View SourceOfficial document in Chinese | English
Example
import MyApp.TableStore
get_range "table_name",
[{"key1", 1}, {"key2", :inf_min}],
[{"key1", 4}, {"key2", :inf_max}],
direction: :forward
get_range "table_name",
[{"key1", 1}, {"key2", :inf_min}],
[{"key1", 4}, {"key2", :inf_max}],
time_range: {1525922253224, 1525923253224},
direction: :forward
get_range "table_name",
[{"key1", 1}, {"key2", :inf_min}],
[{"key1", 4}, {"key2", :inf_max}],
time_range: 1525942123224,
direction: :forward
Also, there is an alternative stream_range/5
to iteratively get range of rows in stream.
Options
:direction
, required, the order of fetch data, available options are:forward
|:backward
, by it is:forward
.:forward
, this query is performed in the order of primary key in ascending, in this case, inputinclusive_start_primary_keys
should less thanexclusive_end_primary_keys
;:backward
, this query is performed in the order of primary key in descending, in this case, inputinclusive_start_primary_keys
should greater thanexclusive_end_primary_keys
.
:columns_to_get
, optional, fetch the special fields, by default it returns all fields, pass a field list to specify the expected return fields, e.g.["field1", "field2"]
.:start_column
, optional, specifies the start column when using for wide-row-read, the returned result contains this:start_column
.:end_column
, optional, specifies the end column when using for wide-row-read, the returned result does not contain this:end_column
.:filter
, optional, filter the return results in the server side, please seefilter/1
for details.:max_versions
, optional, how many versions need to return in results, by default it is 1.:transaction_id
, optional, read operation within local transaction.:limit
, optional, the maximum number of rows of data to be returned, this value must be greater than 0, whether this option is set or not, there returns a maximum of 5,000 data rows and the total data size never exceeds 4 MB.:time_range
, optional, read data by timestamp range, support two ways to use it:time_range: {start_timestamp, end_timestamp}
, the timestamp in the range (includestart_timestamp
but excludeend_timestamp
) and then will return in the results.time_range: special_timestamp
, exactly match and then will return in the results.:time_range
and:max_versions
are mutually exclusive, by default usemax_versions: 1
andtime_range: nil
.
@spec get_row(instance(), table_name(), primary_keys(), options()) :: result()
Official document in Chinese | English
Example
import MyApp.TableStore
get_row "table1",
[{"key1", "id1"}, {"key2", "id2"}],
columns_to_get: ["name", "level"],
filter: filter(("name[ignore_if_missing: true, latest_version_only: true]" == var_name and "age" > 1) or ("class" == "1"))
get_row "table2",
[{"key", "1"}],
start_column: "room",
filter: pagination(offset: 0, limit: 3)
get_row "table3",
[{"key", "1"}],
transaction_id: "transaction_id"
Options
:columns_to_get
, optional, fetch the special fields, by default it returns all fields, pass a field list to specify the expected return fields e.g.["field1", "field2"]
.:start_column
, optional, specifies the start column when using for wide-row-read, the returned result contains this:start_column
.:end_column
, optional, specifies the end column when using for wide-row-read, the returned result does not contain this:end_column
.:filter
, optional, filter the return results in the server side, please seefilter/1
for details.:max_versions
, optional, how many versions need to return in results, by default it is 1.:time_range
, optional, read data by timestamp range, support two ways to use it:time_range: {start_timestamp, end_timestamp}
, the timestamp in the range (includestart_timestamp
but excludeend_timestamp
) and then will return in the results.time_range: special_timestamp
, exactly match and then will return in the results.:time_range
and:max_versions
are mutually exclusive, by default usemax_versions: 1
andtime_range: nil
.
:transaction_id
, optional, read operation within local transaction.
iterate_all_range(instance, table_name, inclusive_start_primary_keys, exclusive_end_primary_keys, options \\ [])
View Source@spec iterate_all_range( instance(), table_name(), inclusive_start_primary_keys(), exclusive_end_primary_keys(), options() ) :: result()
As a wrapper built on get_range/5
to fetch a full matched data set by iterate, if process a large items,
recommend to use stream_range/5
.
Example
import MyApp.TableStore
iterate_all_range table_name1,
[{"key1", 1}, {"key2", :inf_min}],
[{"key1", 4}, {"key2", :inf_max}],
direction: :forward
Options
Please see options of get_range/5
for details.
Official document in Chinese | English
Example
import MyApp.TableStore
get_row table_name,
[{"key", "1"}],
start_column: "room",
filter: pagination(offset: 0, limit: 3)
Use pagination/1
for :filter
options when get row.
Official document in Chinese | English
Example
import MyApp.TableStore
put_row "table1",
[{"key1", "id1"}],
[{"name", "name1"}, {"age", 20}],
condition: :expect_not_exist,
return_type: :pk
put_row "table2",
[{"key1", "id1"}],
[{"name", "name1"}, {"age", 20}],
condition: :expect_not_exist,
transaction_id: "transaction_id",
return_type: :pk
Options
:condition
, required, please seecondition/1
orcondition/2
for details.:return_type
, optional, whether return the primary keys after put row, available options are:pk
|:none
, by default it is:none
.:transaction_id
, optional, write operation within local transaction.
stream_range(instance, table_name, inclusive_start_primary_keys, exclusive_end_primary_keys, options \\ [])
View SourceAs a wrapper built on get_range/5
to create composable and lazy enumerable stream for iteration.
Example
import MyApp.TableStore
stream =
stream_range table_name1,
[{"key1", 1}, {"key2", :inf_min}],
[{"key1", 4}, {"key2", :inf_max}],
direction: :forward
Enum.to_list(stream, fn
{:ok, %{rows: rows} = response} ->
# process rows
{:error, error} ->
# occur error
end)
Options
Please see options of get_range/5
for details.
@spec update_row(instance(), table_name(), primary_keys(), options()) :: result()
Official document in Chinese | English
Example
import MyApp.TableStore
value = "1"
update_row "table1",
[{"key1", 2}, {"key2", "2"}],
delete: [{"attr2", nil, 1524464460}],
delete_all: ["attr1"],
put: [{"attr3", "put_attr3"}],
return_type: :pk,
condition: condition(:expect_exist, "attr2" == value)
update_row "table2",
[{"key1", 1}],
put: [{"attr1", "put_attr1"}],
increment: [{"count", 1}],
return_type: :after_modify,
return_columns: ["count"],
condition: :ignore
update_row "table3",
[partition_key],
put: [{"new_attr1", "a1"}],
delete_all: ["level", "size"],
condition: :ignore,
transaction_id: "transaction_id"
Options
:put
, optional, require to be valid value, e.g.[{"field1", "value"}, {...}]
, insert a new column if this field is not existed, or overwrite this field if existed.:delete
, optional, delete the special version of a column or columns, please pass the column's version (timestamp) in:delete
option, e.g. [{"field1", nil, 1524464460}, ...].:delete_all
, optional, delete all versions of a column or columns, e.g. ["field1", "field2", ...].:increment
, optional, attribute column(s) base on atomic counters for increment or decrement, require the value of column is integer.- for increment,
increment: [{"count", 1}]
; - for decrement,
increment: [{"count", -1}]
.
- for increment,
:return_type
, optional, whether return the primary keys after update row, available options are:pk
|:none
|:after_modify
, by default it is:none
.- if use atomic counters, must set
return_type: :after_modify
.
- if use atomic counters, must set
:condition
, required, please seecondition/1
orcondition/2
for details.:transaction_id
, optional, write operation within local transaction.
@spec write_delete(primary_keys(), options()) :: map()
Used in batch write operation, please see batch_write/2
for details.
Options
The available operation same as delete_row/4
.
@spec write_put(primary_keys(), attrs :: list(), options()) :: map()
Used in batch write operation, please see batch_write/2
for details.
Options
The available options are same as put_row/5
.
@spec write_update(primary_keys(), options()) :: map()
Used in batch write operation, please see batch_write/2
for details.
Options
The available options are same as update_row/4
.
Local Transaction
Official document in Chinese | English
Example
import MyApp.TableStore
abort_transaction("transaction_id")
Official document in Chinese | English
Example
import MyApp.TableStore
commit_transaction("transaction_id")
@spec start_local_transaction(instance(), table_name(), partition_key :: tuple()) :: result()
Official document in Chinese | English
Example
import MyApp.TableStore
partition_key = {"key", "key1"}
start_local_transaction("table", partition_key)
Search
@spec compute_splits(instance(), table_name(), index_name()) :: result()
Query current supported maximum number of concurrent tasks to parallel_scan/4
request.
@spec create_search_index(instance(), table_name(), index_name(), options()) :: result()
Official document in Chinese | English
Example
import MyApp.TableStore
create_search_index "table", "index_name",
field_schemas: [
field_schema_keyword("name"),
field_schema_integer("age")
]
create_search_index "table", "index_name",
field_schemas: [
field_schema_keyword("name"),
field_schema_geo_point("location"),
field_schema_integer("value")
]
create_search_index "table", "index_name",
field_schemas: [
field_schema_nested(
"content",
field_schemas: [
field_schema_keyword("header"),
field_schema_keyword("body")
]
)
]
Options
:field_schemas
, required, a list of predefined search-index schema fields, please see the following helper functions::index_sorts
, optional, a list of predefined sort-index schema fields, please see the following helper functions:
@spec delete_search_index(instance(), table_name(), index_name()) :: result()
Official document in Chinese | English
Example
import MyApp.TableStore
delete_search_index("table", "index_name")
@spec describe_search_index(instance(), table_name(), index_name()) :: result()
Official document in Chinese | English
Example
import MyApp.TableStore
describe_search_index("table", "index_name")
iterate_parallel_scan(instance, table_name, index_name, fun, options)
View Source@spec iterate_parallel_scan( instance(), table_name(), index_name(), fun :: (term() -> term()), options() ) :: term()
A simple wrapper of stream_parallel_scan/4
to take care OTSSessionExpired
error with retry, make parallel scan
as a stream that applies the given function to the complete result of scan query.
In general, recommend to use this function for the common use case of parallel scan.
Options
:scan_query
, required, the main option to use query.:query
, required, bind to the query functions, the same as query option ofsearch/5
.:limit
, optional, the limited size of query, defaults to 2000, the maximum value of limit is 2000.
:columns_to_get
, optional, fetch the special fields, by default it returns all fields of the search index, here are available options::all_from_index
, return all attribute column fields of search index;:none
, do not return any attribute column fields;["field1", "field2"]
, specifies the expected return attribute column fields.
:timeout
, optional, the:timeout
option ofTask.async_stream/3
, defaults to:infinity
.
Example
def iterate_stream(stream) do
Enum.map(stream, fn
{:ok, response} ->
response
{:error, error} ->
error
end)
end
iterate_parallel_scan(
"table",
"index",
&iterate_stream/1,
scan_query: [
query: match_query("is_actived", "true"),
limit: 1000
],
columns_to_get: ["is_actived", "name", "score"]
)
iterate_parallel_scan(instance, table_name, index_name, mod, fun, args, options)
View Source@spec iterate_parallel_scan( instance(), table_name(), index_name(), mod :: module(), fun :: atom(), args :: [term()], options() ) :: term()
A simple wrapper of stream_parallel_scan/4
to take care OTSSessionExpired
error with retry, make parallel scan
as a stream that applies the given function from module
with the list of arguments args
to the complete result of scan query.
In general, recommend to use this function for the common use case of parallel scan.
Options
Please see options of iterate_parallel_scan/5
.
Example
defmodule StreamHandler do
def iterate_stream(stream) do
Enum.map(stream, fn
{:ok, response} ->
response
{:error, error} ->
error
end)
end
end
iterate_parallel_scan(
"table",
"index",
StreamHandler,
:iterate_stream,
[],
scan_query: [
query: match_query("is_actived", "true"),
limit: 1000
],
columns_to_get: ["field1", "field2"]
)
@spec iterate_search(instance(), table_name(), index_name(), options()) :: result()
As a wrapper built on stream_search/4
to fetch a full matched data set as a stream, then use Enum.reduce/2
to iteratively
format all data into a list, if process a large items, recommend to use stream_search/4
.
Options
Please see options of search/4
for details.
@spec list_search_index(instance(), table_name()) :: result()
Official document in Chinese | English
Example
import MyApp.TableStore
list_search_index("table")
@spec parallel_scan(instance(), table_name(), index_name(), options()) :: result()
Leverage concurrent tasks to query matched raw data (still be with search function) more quickly, in this use case, this function is improved for speed up scan query, but no guarantee to the order of query results, and does not support the aggregation of scan query.
In general, recommend to use iterate_parallel_scan/5
or iterate_parallel_scan/7
for the common use case of parallel scan.
Official document in Chinese | English
Options
:scan_query
, required, the main option to use query.:query
, required, bind to the query functions, the same as query option ofsearch/4
.:limit
, optional, the limited size of query, defaults to 2000, the maximum value of limit is 2000.:token
, optional, when do not load all the matched rows in a single request, there will return anext_token
value in that result, and then we can pass it to:token
in the next same scan query to continue load the rest rows.:max_parallel
, required, the maximum number of concurrent, as thesplits_size
value from the response ofcompute_splits/3
.:current_parallel_id
, required, refer the official document, the available value is in [0, max_parallel).
:columns_to_get
, optional, fetch the special fields, by default it returns all fields of the search index, here are available options::all_from_index
, return all attribute column fields of search index;:none
, do not return any attribute column fields;["field1", "field2"]
, specifies the expected return attribute column fields.
session_id
, as usual, this option is required from the response ofcompute_splits/3
, if not set this option, the query result may contain duplicate data, refer the official document, once occurs anOTSSessionExpired
error, must initiate another parallel scan task to re-query data.
@spec search(instance(), table_name(), index_name(), options()) :: result()
The one entrance to use search index functions, please see ExAliyunOts.Search
module for details.
Official document in Chinese | English
Options
:search_query
, required, the main option to use query and sort.:query
, required, bind to the query functions:ExAliyunOts.Search.bool_query/1
ExAliyunOts.Search.exists_query/1
ExAliyunOts.Search.geo_bounding_box_query/3
ExAliyunOts.Search.geo_distance_query/3
ExAliyunOts.Search.geo_polygon_query/2
ExAliyunOts.Search.match_all_query/0
ExAliyunOts.Search.match_phrase_query/2
ExAliyunOts.Search.match_query/3
ExAliyunOts.Search.nested_query/3
ExAliyunOts.Search.prefix_query/2
ExAliyunOts.Search.range_query/2
ExAliyunOts.Search.term_query/2
ExAliyunOts.Search.terms_query/2
ExAliyunOts.Search.wildcard_query/2
:sort
, optional, by default it is usepk_sort/1
, bind to the Sort functions::aggs
, optional, please see official document in Chinese | English.:group_bys
, optional, please see official document in Chinese | English.:limit
, optional, the limited size of query.:offset
, optional, the offset size of query. When the total rows are less or equal than 2000, can both used:limit
and:offset
to pagination.:get_total_count
, optional, return the total count of the all matched rows, by default it istrue
.:token
, optional, when do not load all the matched rows in a single request, there will return anext_token
value in that result, and then we can pass it to:token
in the next same search query to continue load the rest rows.:collapse
, optional, duplicate removal by the specified field, please see official document in Chinese, please NOTICE that currently there does not support use:collapse
with:token
together.
:columns_to_get
, optional, fetch the special fields, by default it returns all fields, here are available options::all
, return all attribute column fields;:none
, do not return any attribute column fields;["field1", "field2"]
, specifies the expected return attribute column fields.
@spec stream_parallel_scan(instance(), table_name(), index_name(), options()) :: Enumerable.t()
Integrate parallel_scan/4
with compute_splits/3
as a complete use, base on the response of compute_splits/3
to create the corresponding
number of concurrency task(s), use Task.async_stream/3
to make parallel scan as a stream which properly process token
in every request of the internal, when use this function need to consider the possibility of the OTSSessionExpired
error in the external.
Options
Please see options of iterate_parallel_scan/5
.
@spec stream_search(instance(), table_name(), index_name(), options()) :: Enumerable.t()
As a wrapper built on search/4
to create composable and lazy enumerable stream for iteration.
Options
Please see options of search/4
for details.
Types
@type column_name() :: String.t()
@type exclusive_end_primary_keys() :: list()
@type inclusive_start_primary_keys() :: list()
@type index_name() :: String.t()
@type instance() :: atom()
@type options() :: Keyword.t()
@type primary_keys() :: list()
@type query() :: String.t()
@type result() :: {:ok, map()} | {:error, ExAliyunOts.Error.t()}
@type row_existence() :: :ignore | :expect_exist | :expect_not_exist
@type table_name() :: String.t()
Functions
@spec alter_table_add_column( instance(), table_name(), column_name(), type :: String.t() ) :: :ok | {:error, ExAliyunOts.Error.t()}
Official document in Chinese | English
Example
import MyApp.TableStore
alter_table_add_column("table", "column", "BOOL")
@spec alter_table_drop_column(instance(), table_name(), column_name()) :: :ok | {:error, ExAliyunOts.Error.t()}
Official document in Chinese | English
Example
import MyApp.TableStore
alter_table_drop_column("table", "column")
@spec create_mapping_table(instance(), query()) :: :ok | {:error, ExAliyunOts.Error.t()}
Official document in Chinese | English
Example
import MyApp.TableStore
create_mapping_table("""
CREATE TABLE table (
id VARCHAR(1024) PRIMARY KEY,
is_actived BOOL,
name MEDIUMTEXT,
score DOUBLE,
tags MEDIUMTEXT
)
""")
@spec describe_mapping_table(instance(), table_name()) :: result()
Official document in Chinese | English
Example
import MyApp.TableStore
describe_mapping_table("table")
@spec drop_mapping_table(instance(), table_name()) :: :ok | {:error, ExAliyunOts.Error.t()}
Official document in Chinese | English
Example
import MyApp.TableStore
drop_mapping_table("table")
Official document in Chinese | English
Example
import MyApp.TableStore
query("SELECT * FROM table LIMIT 20")
Official document in Chinese | English
Example
import MyApp.TableStore
sql_query("SELECT * FROM table LIMIT 20")