ExAws v0.4.19 ExAws.Kinesis.Client behaviour

Defines a Kinesis client.

Usage:

defmodule MyApp.Kinesis do
  use ExAws.Kinesis.Client, otp_app: :my_otp_app
end

In your config

config :my_otp_app, :ex_aws,
  kinesis:  [], # kinesis config goes here
  dynamodb: [], # you get the idea

You can now use MyApp.Kinesis as the root module for the Kinesis api without needing to pass in a particular configuration. This enables different otp apps to configure their AWS configuration separately.

The alignment with a particular OTP app however is entirely optional. The following also works:

defmodule MyApp.Kinesis do
  use ExAws.Kinesis.Client

  def config_root do
    Application.get_all_env(:my_aws_config_root)
  end
end

ExAws now expects the config for that kinesis client to live under

config :my_aws_config_root
  kinesis: [] # Kinesis config goes here

Default config values can be found in ExAws.Config

http://docs.aws.amazon.com/kinesis/latest/APIReference/API_Operations.html

Summary

Callbacks

Retrieves the root AWS config for this client

Get stream records

Lists streams

Put multiple records on a stream

Enables custom request handling

Returns a stream of kinesis records NOTE: This stream is basically INFINITE, in that it runs until the shard it is reading from closes, which may be never. If you want it to take records until there are no more (at the moment), something like

Same as describe_stream/1,2 except the shards key is a stream and will automatically handle pagination Returns the normally shaped AWS response, except the Shards key is now a stream

Types

describe_stream_opts :: [limit: pos_integer, exclusive_start_shard_id: binary]
get_records_opts :: [{:limit, pos_integer}]
get_shard_iterator_opts :: [{:starting_sequence_number, binary}]
list_tags_for_stream_opts :: [limit: pos_integer, exclusive_start_tag_key: binary]
put_record_opts :: [explicit_hash_key: binary, sequence_number_for_ordering: binary]
put_records_record :: [data: binary, explicit_hash_key: binary]
shard_iterator_types ::
  :at_sequence_number |
  :after_sequence_number |
  :trim_horizon |
  :latest
stream_name :: binary
stream_records_opts :: [limit: pos_integer, sleep_between_req_time: non_neg_integer]
stream_tags :: [{atom, binary} | {binary, binary}]

Callbacks

add_tags_to_stream(stream_name, tags)

Specs

add_tags_to_stream(stream_name :: binary, tags :: stream_tags) :: ExAws.Kinesis.Request.response_t

Add tags to stream

config_root()

Specs

config_root :: Keyword.t

Retrieves the root AWS config for this client

create_stream(stream_name)

Specs

create_stream(stream_name :: stream_name) :: ExAws.Kinesis.Request.response_t

Creates stream

create_stream(stream_name, shard_count)

Specs

create_stream(stream_name :: stream_name, shard_count :: pos_integer) :: ExAws.Kinesis.Request.response_t
delete_stream(stream_name)

Specs

delete_stream(stream_name :: stream_name) :: ExAws.Kinesis.Request.response_t

Deletes stream

describe_stream(stream_name)

Specs

describe_stream(stream_name :: stream_name) :: ExAws.Kinesis.Request.response_t

Describe Stream

describe_stream(stream_name, opts)

Specs

describe_stream(stream_name :: stream_name, opts :: describe_stream_opts) :: ExAws.Kinesis.Request.response_t
get_records(shard_iterator)

Specs

get_records(shard_iterator :: binary) :: ExAws.Kinesis.Request.response_t

Get stream records

get_records(shard_iterator, opts)

Specs

get_records(shard_iterator :: binary, opts :: get_records_opts) :: ExAws.Kinesis.Request.response_t
get_shard_iterator(stream_name, shard_id, shard_iterator_type)

Specs

get_shard_iterator(stream_name :: stream_name, shard_id :: binary, shard_iterator_type :: shard_iterator_types) :: ExAws.Kinesis.Request.response_t

Get a shard iterator

get_shard_iterator(stream_name, shard_id, shard_iterator_type, opts)

Specs

get_shard_iterator(stream_name :: stream_name, shard_id :: binary, shard_iterator_type :: shard_iterator_types, opts :: get_shard_iterator_opts) :: ExAws.Kinesis.Request.response_t
list_streams()

Specs

list_streams :: ExAws.Kinesis.Request.response_t

Lists streams

list_tags_for_stream(stream_name)

Specs

list_tags_for_stream(stream_name :: binary) :: term

Add tags to stream

list_tags_for_stream(stream_name, opts)

Specs

list_tags_for_stream(stream_name :: binary, opts :: list_tags_for_stream_opts) :: ExAws.Kinesis.Request.response_t
merge_shards(stream_name, adjacent_shard_id, shard_id)

Specs

merge_shards(stream_name :: stream_name, adjacent_shard_id :: binary, shard_id :: binary) :: ExAws.Kinesis.Request.response_t

Merge adjacent shards

put_record(stream_name, partition_key, data)

Specs

put_record(stream_name :: stream_name, partition_key :: binary, data :: binary) :: ExAws.Kinesis.Request.response_t

Puts a record on a stream

put_record(stream_name, partition_key, data, opts)

Specs

put_record(stream_name :: stream_name, partition_key :: binary, data :: binary, opts :: put_record_opts) :: ExAws.Kinesis.Request.response_t
put_records(stream_name, records)

Specs

put_records(stream_name :: stream_name, records :: [put_records_record]) :: ExAws.Kinesis.Request.response_t

Put multiple records on a stream

remove_tags_from_stream(stream_name, tag_keys)

Specs

remove_tags_from_stream(stream_name :: binary, tag_keys :: [binary]) :: ExAws.Kinesis.Request.response_t

Remove tags from stream

request(client, data, action)

Specs

request(client :: %{}, data :: %{}, action :: atom) :: term

Enables custom request handling.

By default this just forwards the request to the ExAws.Kinesis.Request.request/2. However, this can be overriden in your client to provide pre-request adjustments to headers, params, etc.

split_shard(stream_name, shard, new_starting_hash_key)

Specs

split_shard(stream_name :: binary, shard :: binary, new_starting_hash_key :: binary) :: ExAws.Kinesis.Request.response_t

Split a shard

stream_records(shard_iterator)

Specs

stream_records(shard_iterator :: binary) :: Enumerable.t

Returns a stream of kinesis records NOTE: This stream is basically INFINITE, in that it runs until the shard it is reading from closes, which may be never. If you want it to take records until there are no more (at the moment), something like

"my-stream"
|> Kinesis.stream_records
|> Enum.take_while(fn(val) -> !match?(%{"Data" => []}, val))

ought to do the trick.

The optional iterator_fun is a function that is called after every actual AWS request. Generally speaking you won’t need this, but it can be handy if you’re trying to prevent flooding. See Mix.Tasks.Kinesis.Tail.get_records/1 for an example.

The sleep_between_req_time is the amount of time that this function will sleep between requests to avoid exceeding the provisioned read capacity. It defaults to 200ms.

stream_records(shard_iterator, opts)

Specs

stream_records(shard_iterator :: binary, opts :: stream_records_opts) :: Enumerable.t
stream_records(shard_iterator, opts, each_req_fun)

Specs

stream_records(shard_iterator :: binary, opts :: stream_records_opts, each_req_fun :: Fun) :: Enumerable.t
stream_shards(stream_name)

Specs

stream_shards(stream_name :: stream_name) :: Enumerable.t

Same as describe_stream/1,2 except the shards key is a stream and will automatically handle pagination Returns the normally shaped AWS response, except the Shards key is now a stream

stream_shards(stream_name, opts)

Specs

stream_shards(stream_name :: stream_name, opts :: describe_stream_opts) :: Enumerable.t