kashka v0.1.0 Kashka.Kafka View Source

Module to make direct requests to Kafka Rest Proxy. Use Confluent REST Proxy API Reference to figure out how to use

Link to this section Summary

Functions

Get the list of partitions currently manually assigned to this consumer. See Confluent REST Proxy API Reference

Close HTTP connection.

Commit a list of offsets for the consumer. See Confluent REST Proxy API Reference

Create a new consumer instance in the consumer group. See Confluent REST Proxy API Reference

Destroy the consumer instance. See Confluent REST Proxy API Reference

Destroy the consumer instance. See Confluent REST Proxy API Reference. This function mostly used in tests when there is only one kafka rest proxy instance.

Build connection to exsisting consumer. This function mostly used in tests when there is only one kafka rest proxy instance

Seek to the last offset for each of the given partitions. See Confluent REST Proxy API Reference

Subscribe to the given list of topics. See Confluent REST Proxy API Reference

Requests all topics list See Confluent REST Proxy API Reference

Link to this section Types

Link to this type

binary_records()

View Source
binary_records() :: %{
  :key => String.t(),
  :value => String.t(),
  optional(:partition) => integer()
}
Link to this type

create_consumer_params()

View Source
create_consumer_params() :: %{
  optional(:name) => String.t(),
  optional(:format) => :binary | :json,
  optional(:"auto.offset.reset") => :earliest | :latest,
  optional(:"auto.commit.enable") => true | false,
  optional(:"fetch.min.bytes") => integer(),
  optional(:"consumer.request.timeout.ms") => integer()
}
Link to this type

http_error()

View Source
http_error() :: {:error, :http, code :: non_neg_integer(), iodata()}
Link to this type

json_records()

View Source
json_records() :: %{
  :key => String.t(),
  :value => any(),
  optional(:partition) => integer()
}
Link to this type

partition()

View Source
partition() :: %{topic: String.t() | atom(), partition: integer()}
Link to this type

records_query_params()

View Source
records_query_params() :: %{
  optional(:timeout) => integer(),
  optional(:max_bytes) => integer()
}

Link to this section Functions

Link to this function

assignments(conn)

View Source
assignments(Kashka.Http.t()) :: {:ok, Kashka.Http.t(), [any()]} | http_error()

Get the list of partitions currently manually assigned to this consumer. See Confluent REST Proxy API Reference

Parameters

  • conn: connection or url for created consumer

Close HTTP connection.

Link to this function

commit(conn, offsets_or_records \\ nil)

View Source
commit(Kashka.Http.conn(), [map()] | nil) ::
  {:ok, Kashka.Http.t()} | http_error()

Commit a list of offsets for the consumer. See Confluent REST Proxy API Reference

Parameters

  • conn: connection or url for created consumer
  • offsets_or_records: offsets list or records returned from get_records/3
Link to this function

create_consumer(conn, consumer_group, opts)

View Source
create_consumer(Kashka.Http.t(), String.t(), create_consumer_params()) ::
  {:ok, Kashka.Http.t(), map()} | {:error, :exists} | http_error()

Create a new consumer instance in the consumer group. See Confluent REST Proxy API Reference

Parameters

  • conn: connection or url for created consumer
  • consumer_group: the name of the consumer group to join
  • opts: consumer settings

Returns map with "base_uri" and "instance_id" fields. For example:

  %{
    "instance_id" => "my_consumer",
    "base_uri" => "http://proxy-instance.kafkaproxy.example.com/consumers/testgroup/instances/my_consumer"
  }
Link to this function

delete_consumer(conn)

View Source
delete_consumer(Kashka.Http.t()) :: {:ok, Kashka.Http.t()} | http_error()

Destroy the consumer instance. See Confluent REST Proxy API Reference

Parameters

  • conn: connection or url for created consumer
Link to this function

delete_consumer(conn, group, name)

View Source
delete_consumer(Kashka.Http.t(), String.t(), String.t()) ::
  {:ok, Kashka.Http.t()} | http_error()

Destroy the consumer instance. See Confluent REST Proxy API Reference. This function mostly used in tests when there is only one kafka rest proxy instance.

Parameters

  • conn: connection or url for rest proxy instance
  • group: consumer group name
  • name: consumer instance_id
Link to this function

get_records(conn, opts, format \\ :json)

View Source
get_records(Kashka.Http.conn(), records_query_params(), :json | :binary) ::
  {:ok, Kashka.Http.t(), [map()]} | http_error()

Produce messages to a topic. See Confluent REST Proxy API Reference

Parameters

  • conn: connection or url for created consumer
  • opts: query Parameters
  • format: created consumer records format: json or binary
Link to this function

move_to_existing_consumer(conn, group, name)

View Source
move_to_existing_consumer(Kashka.Http.t(), String.t(), String.t()) ::
  Kashka.Http.t()

Build connection to exsisting consumer. This function mostly used in tests when there is only one kafka rest proxy instance

Parameters

  • conn: connection or url for rest proxy instance
  • group: consumer group name
  • name: consumer instance_id
Link to this function

offsets(conn, partitions)

View Source
offsets(Kashka.Http.conn(), [partition()]) ::
  {:ok, Kashka.Http.t(), [any()]} | http_error()

Requests offsets for consumer. See Confluent REST Proxy API Reference

Parameters

  • conn: connection or url for created consumer
  • partitions: a list of partitions like [%{topic: "test", partition: 0}]
Link to this function

positions_end(conn, partitions)

View Source
positions_end(Kashka.Http.t(), [partition()]) ::
  {:ok, Kashka.Http.t()} | http_error()

Seek to the last offset for each of the given partitions. See Confluent REST Proxy API Reference

Parameters

  • conn: connection or url for created consumer
Link to this function

produce(conn, topic, records, format \\ :json)

View Source
produce(
  Kashka.Http.conn(),
  String.t(),
  [json_records() | binary_records()],
  :json | :binary
) :: {:ok, Kashka.Http.t()} | http_error()

Produce messages to a topic. See Confluent REST Proxy API Reference

Parameters

  • conn: connection or url for rest proxy
  • topic: topic name
  • records: records list
  • format: records format json or binary
Link to this function

subscribe(conn, topics)

View Source
subscribe(Kashka.Http.t(), [String.t()]) ::
  {:ok, Kashka.Http.t()} | http_error()

Subscribe to the given list of topics. See Confluent REST Proxy API Reference

Parameters

  • conn: connection or url for rest proxy
  • topics: a list of topics to subscribe

Requests all topics list See Confluent REST Proxy API Reference

Parameters

  • conn: connection or url for rest proxy