kashka v0.1.0 Kashka.Kafka View Source
Module to make direct requests to Kafka Rest Proxy. Use Confluent REST Proxy API Reference to figure out how to use
Link to this section Summary
Functions
Get the list of partitions currently manually assigned to this consumer. See Confluent REST Proxy API Reference
Close HTTP connection.
Commit a list of offsets for the consumer. See Confluent REST Proxy API Reference
Create a new consumer instance in the consumer group. See Confluent REST Proxy API Reference
Destroy the consumer instance. See Confluent REST Proxy API Reference
Destroy the consumer instance. See Confluent REST Proxy API Reference. This function mostly used in tests when there is only one kafka rest proxy instance.
Produce messages to a topic. See Confluent REST Proxy API Reference
Build connection to exsisting consumer. This function mostly used in tests when there is only one kafka rest proxy instance
Requests offsets for consumer. See Confluent REST Proxy API Reference
Seek to the last offset for each of the given partitions. See Confluent REST Proxy API Reference
Produce messages to a topic. See Confluent REST Proxy API Reference
Subscribe to the given list of topics. See Confluent REST Proxy API Reference
Requests all topics list See Confluent REST Proxy API Reference
Link to this section Types
create_consumer_params()
View Sourcecreate_consumer_params() :: %{
optional(:name) => String.t(),
optional(:format) => :binary | :json,
optional(:"auto.offset.reset") => :earliest | :latest,
optional(:"auto.commit.enable") => true | false,
optional(:"fetch.min.bytes") => integer(),
optional(:"consumer.request.timeout.ms") => integer()
}
http_error()
View Sourcehttp_error() :: {:error, :http, code :: non_neg_integer(), iodata()}
Link to this section Functions
assignments(conn)
View Sourceassignments(Kashka.Http.t()) :: {:ok, Kashka.Http.t(), [any()]} | http_error()
Get the list of partitions currently manually assigned to this consumer. See Confluent REST Proxy API Reference
Parameters
- conn: connection or url for created consumer
Close HTTP connection.
commit(conn, offsets_or_records \\ nil)
View Sourcecommit(Kashka.Http.conn(), [map()] | nil) :: {:ok, Kashka.Http.t()} | http_error()
Commit a list of offsets for the consumer. See Confluent REST Proxy API Reference
Parameters
- conn: connection or url for created consumer
- offsets_or_records: offsets list or records returned from
get_records/3
create_consumer(conn, consumer_group, opts)
View Sourcecreate_consumer(Kashka.Http.t(), String.t(), create_consumer_params()) :: {:ok, Kashka.Http.t(), map()} | {:error, :exists} | http_error()
Create a new consumer instance in the consumer group. See Confluent REST Proxy API Reference
Parameters
- conn: connection or url for created consumer
- consumer_group: the name of the consumer group to join
- opts: consumer settings
Returns map with "base_uri" and "instance_id" fields. For example:
%{
"instance_id" => "my_consumer",
"base_uri" => "http://proxy-instance.kafkaproxy.example.com/consumers/testgroup/instances/my_consumer"
}
delete_consumer(conn)
View Sourcedelete_consumer(Kashka.Http.t()) :: {:ok, Kashka.Http.t()} | http_error()
Destroy the consumer instance. See Confluent REST Proxy API Reference
Parameters
- conn: connection or url for created consumer
delete_consumer(conn, group, name)
View Sourcedelete_consumer(Kashka.Http.t(), String.t(), String.t()) :: {:ok, Kashka.Http.t()} | http_error()
Destroy the consumer instance. See Confluent REST Proxy API Reference. This function mostly used in tests when there is only one kafka rest proxy instance.
Parameters
- conn: connection or url for rest proxy instance
- group: consumer group name
- name: consumer instance_id
get_records(conn, opts, format \\ :json)
View Sourceget_records(Kashka.Http.conn(), records_query_params(), :json | :binary) :: {:ok, Kashka.Http.t(), [map()]} | http_error()
Produce messages to a topic. See Confluent REST Proxy API Reference
Parameters
- conn: connection or url for created consumer
- opts: query Parameters
- format: created consumer records format: json or binary
move_to_existing_consumer(conn, group, name)
View Sourcemove_to_existing_consumer(Kashka.Http.t(), String.t(), String.t()) :: Kashka.Http.t()
Build connection to exsisting consumer. This function mostly used in tests when there is only one kafka rest proxy instance
Parameters
- conn: connection or url for rest proxy instance
- group: consumer group name
- name: consumer instance_id
offsets(conn, partitions)
View Sourceoffsets(Kashka.Http.conn(), [partition()]) :: {:ok, Kashka.Http.t(), [any()]} | http_error()
Requests offsets for consumer. See Confluent REST Proxy API Reference
Parameters
- conn: connection or url for created consumer
- partitions: a list of partitions like
[%{topic: "test", partition: 0}]
positions_end(conn, partitions)
View Sourcepositions_end(Kashka.Http.t(), [partition()]) :: {:ok, Kashka.Http.t()} | http_error()
Seek to the last offset for each of the given partitions. See Confluent REST Proxy API Reference
Parameters
- conn: connection or url for created consumer
produce(conn, topic, records, format \\ :json)
View Sourceproduce( Kashka.Http.conn(), String.t(), [json_records() | binary_records()], :json | :binary ) :: {:ok, Kashka.Http.t()} | http_error()
Produce messages to a topic. See Confluent REST Proxy API Reference
Parameters
- conn: connection or url for rest proxy
- topic: topic name
- records: records list
- format: records format json or binary
subscribe(conn, topics)
View Sourcesubscribe(Kashka.Http.t(), [String.t()]) :: {:ok, Kashka.Http.t()} | http_error()
Subscribe to the given list of topics. See Confluent REST Proxy API Reference
Parameters
- conn: connection or url for rest proxy
- topics: a list of topics to subscribe
topics(conn)
View Sourcetopics(Kashka.Http.conn()) :: {:ok, Kashka.Http.t(), [String.t()]} | http_error()
Requests all topics list See Confluent REST Proxy API Reference
Parameters
- conn: connection or url for rest proxy