ExAws.Kinesis.Client behaviour
Defines a Kinesis client.
Usage:
defmodule MyApp.Kinesis do
use ExAws.Kinesis.Client, otp_app: :my_otp_app
end
In your config
config :my_otp_app, :ex_aws,
kinesis: [], # kinesis config goes here
dynamodb: [], # you get the idea
You can now use MyApp.Kinesis as the root module for the Kinesis api without needing to pass in a particular configuration. This enables different otp apps to configure their AWS configuration separately.
The alignment with a particular OTP app however is entirely optional. The following also works:
defmodule MyApp.Kinesis do
use ExAws.Kinesis.Client
def config_root do
Application.get_all_env(:my_aws_config_root)
end
end
ExAws now expects the config for that kinesis client to live under
config :my_aws_config_root
kinesis: [] # Kinesis config goes here
Default config values can be found in ExAws.Config
http://docs.aws.amazon.com/kinesis/latest/APIReference/API_Operations.html
Types ↑
stream_name :: binary
describe_stream_opts :: [limit: pos_integer, exclusive_start_shard_id: binary]
get_records_opts :: [{:limit, pos_integer}]
stream_records_opts :: [limit: pos_integer, sleep_between_req_time: non_neg_integer]
put_record_opts :: [explicit_hash_key: binary, sequence_number_for_ordering: binary]
put_records_record :: [data: binary, explicit_hash_key: binary]
shard_iterator_types :: :at_sequence_number | :after_sequence_number | :trim_horizon | :latest
get_shard_iterator_opts :: [{:starting_sequence_number, binary}]
stream_tags :: [{atom, binary} | {binary, binary}]
list_tags_for_stream_opts :: [limit: pos_integer, exclusive_start_tag_key: binary]
Callbacks
Specs:
- add_tags_to_stream(stream_name :: binary, tags :: stream_tags) :: ExAws.Request.response_t
Add tags to stream
Specs:
- config_root :: Keyword.t
Retrieves the root AWS config for this client
Specs:
- create_stream(stream_name :: stream_name) :: ExAws.Request.response_t
Creates stream
Specs:
- create_stream(stream_name :: stream_name, shard_count :: pos_integer) :: ExAws.Request.response_t
Specs:
- delete_stream(stream_name :: stream_name) :: ExAws.Request.response_t
Deletes stream
Specs:
- describe_stream(stream_name :: stream_name) :: ExAws.Request.response_t
Describe Stream
Specs:
- describe_stream(stream_name :: stream_name, opts :: describe_stream_opts) :: ExAws.Request.response_t
Specs:
- get_records(shard_iterator :: binary) :: ExAws.Request.response_t
Get stream records
Specs:
- get_records(shard_iterator :: binary, opts :: get_records_opts) :: ExAws.Request.response_t
Specs:
- get_shard_iterator(stream_name :: stream_name, shard_id :: binary, shard_iterator_type :: shard_iterator_types) :: ExAws.Request.response_t
Get a shard iterator
Specs:
- get_shard_iterator(stream_name :: stream_name, shard_id :: binary, shard_iterator_type :: shard_iterator_types, opts :: get_shard_iterator_opts) :: ExAws.Request.response_t
Specs:
- list_streams :: ExAws.Request.response_t
Lists streams
Specs:
- list_tags_for_stream(stream_name :: binary) :: term
Add tags to stream
Specs:
- list_tags_for_stream(stream_name :: binary, opts :: list_tags_for_stream_opts) :: ExAws.Request.response_t
Specs:
- merge_shards(stream_name :: stream_name, adjacent_shard_id :: binary, shard_id :: binary) :: ExAws.Request.response_t
Merge adjacent shards
Specs:
- put_record(stream_name :: stream_name, partition_key :: binary, data :: binary) :: ExAws.Request.response_t
Puts a record on a stream
Specs:
- put_record(stream_name :: stream_name, partition_key :: binary, data :: binary, opts :: put_record_opts) :: ExAws.Request.response_t
Specs:
- put_records(stream_name :: stream_name, records :: [put_records_record]) :: ExAws.Request.response_t
Put multiple records on a stream
Specs:
- remove_tags_from_stream(stream_name :: binary, tag_keys :: [binary]) :: ExAws.Request.response_t
Remove tags from stream
Specs:
- request(client :: %{}, data :: %{}, action :: atom) :: term
Enables custom request handling.
By default this just forwards the request to the ExAws.Kinesis.Request.request/2
.
However, this can be overriden in your client to provide pre-request adjustments to headers, params, etc.
Specs:
- split_shard(stream_name :: binary, shard :: binary, new_starting_hash_key :: binary) :: ExAws.Request.response_t
Split a shard
Specs:
- stream_records(shard_iterator :: binary) :: Enumerable.t
Returns a stream of kinesis records NOTE: This stream is basically INFINITE, in that it runs until the shard it is reading from closes, which may be never. If you want it to take records until there are no more (at the moment), something like
"my-stream"
|> Kinesis.stream_records
|> Enum.take_while(fn(val) -> !match?(%{"Data" => []}, val))
ought to do the trick.
The optional iterator_fun is a function that is called after every actual AWS request. Generally speaking you won’t need this, but it can be handy if you’re trying to prevent flooding. See Mix.Tasks.Kinesis.Tail.get_records/1 for an example.
The sleep_between_req_time is the amount of time that this function will sleep between requests to avoid exceeding the provisioned read capacity. It defaults to 200ms.
Specs:
- stream_records(shard_iterator :: binary, opts :: stream_records_opts) :: Enumerable.t
Specs:
- stream_records(shard_iterator :: binary, opts :: stream_records_opts, each_req_fun :: Fun) :: Enumerable.t
Specs:
- stream_shards(stream_name :: stream_name) :: Enumerable.t
Stream Shards
Specs:
- stream_shards(stream_name :: stream_name, opts :: describe_stream_opts) :: Enumerable.t