franz
Types
pub type AckOrCommit(callback_state)
pub type ClientConfig {
RestartDelaySeconds(Int)
GetMetadataTimeoutSeconds(Int)
ReconnectCoolDownSeconds(Int)
AllowTopicAutoCreation(Bool)
AutoStartProducers(Bool)
DefaultProducerConfig(List(ProducerConfig))
UnknownTopicCacheTtl(Int)
}
Constructors
-
RestartDelaySeconds(Int) -
GetMetadataTimeoutSeconds(Int) -
ReconnectCoolDownSeconds(Int) -
AllowTopicAutoCreation(Bool) -
AutoStartProducers(Bool) -
DefaultProducerConfig(List(ProducerConfig)) -
UnknownTopicCacheTtl(Int)
pub type Compression {
NoCompression
Gzip
Snappy
}
Constructors
-
NoCompression -
Gzip -
Snappy
pub type ConsumerConfig {
BeginOffset(OffsetTime)
MinBytes(Int)
MaxBytes(Int)
MaxWaitTime(Int)
SleepTimeout(Int)
PrefetchCount(Int)
PrefetchBytes(Int)
OffsetResetPolicy(OffsetResetPolicy)
SizeStatWindow(Int)
IsolationLevel(IsolationLevel)
ShareLeaderConn(Bool)
}
Constructors
-
BeginOffset(OffsetTime) -
MinBytes(Int) -
MaxBytes(Int) -
MaxWaitTime(Int) -
SleepTimeout(Int) -
PrefetchCount(Int) -
PrefetchBytes(Int) -
OffsetResetPolicy(OffsetResetPolicy) -
SizeStatWindow(Int) -
IsolationLevel(IsolationLevel) -
ShareLeaderConn(Bool)
pub type ConsumerPartition {
ConsumerPartitions(List(Int))
All
}
Constructors
-
ConsumerPartitions(List(Int)) -
All
pub type FranzClient
pub type FranzError {
UnknownError
ClientDown
UnknownTopicOrPartition
ProducerDown
TopicAlreadyExists
ConsumerNotFound(String)
ProducerNotFound(String, Int)
}
Constructors
-
UnknownError -
ClientDown -
UnknownTopicOrPartition -
ProducerDown -
TopicAlreadyExists -
ConsumerNotFound(String) -
ProducerNotFound(String, Int)
pub type GroupConfig {
SessionTimeoutSeconds(Int)
RebalanceTimeoutSeconds(Int)
HeartbeatRateSeconds(Int)
MaxRejoinAttempts(Int)
RejoinDelaySeconds(Int)
OffsetCommitIntervalSeconds(Int)
OffsetRetentionSeconds(Int)
}
Constructors
-
SessionTimeoutSeconds(Int) -
RebalanceTimeoutSeconds(Int) -
HeartbeatRateSeconds(Int) -
MaxRejoinAttempts(Int) -
RejoinDelaySeconds(Int) -
OffsetCommitIntervalSeconds(Int) -
OffsetRetentionSeconds(Int)
pub type IsolationLevel {
ReadCommitted
ReadUncommitted
}
Constructors
-
ReadCommitted -
ReadUncommitted
pub type KafkaMessage {
KafkaMessage(
offset: Int,
key: BitArray,
value: BitArray,
timestamp_type: TimeStampType,
timestamp: Int,
headers: List(#(String, String)),
)
KafkaMessageSet(
topic: Topic,
partition: Int,
high_wm_offset: Int,
messages: List(KafkaMessage),
)
}
Constructors
-
KafkaMessage( offset: Int, key: BitArray, value: BitArray, timestamp_type: TimeStampType, timestamp: Int, headers: List(#(String, String)), ) -
KafkaMessageSet( topic: Topic, partition: Int, high_wm_offset: Int, messages: List(KafkaMessage), )
pub type MessageType {
Message
MessageSet
}
Constructors
-
Message -
MessageSet
pub type OffsetResetPolicy {
ResetBySubscriber
ResetToEarliest
ResetToLatest
}
Constructors
-
ResetBySubscriber -
ResetToEarliest -
ResetToLatest
pub type OffsetTime {
Earliest
Latest
MessageTimestamp(Int)
}
Constructors
-
Earliest -
Latest -
MessageTimestamp(Int)
pub type Partitioner {
PartitionFun(
fn(String, Int, BitArray, BitArray) -> Result(Int, Nil),
)
Random
Hash
}
Constructors
-
PartitionFun( fn(String, Int, BitArray, BitArray) -> Result(Int, Nil), ) -
Random -
Hash
pub type ProducerConfig {
RequiredAcks(Int)
AckTimeout(Int)
PartitionBufferLimit(Int)
PartitionOnwireLimit(Int)
MaxBatchSize(Int)
MaxRetries(Int)
RetryBackoffMs(Int)
Compression(Compression)
MaxLingerMs(Int)
MaxLingerCount(Int)
}
Constructors
-
RequiredAcks(Int) -
AckTimeout(Int) -
PartitionBufferLimit(Int) -
PartitionOnwireLimit(Int) -
MaxBatchSize(Int) -
MaxRetries(Int) -
RetryBackoffMs(Int) -
Compression(Compression) -
MaxLingerMs(Int) -
MaxLingerCount(Int)
pub type ProducerPartition {
Partition(Int)
Partitioner(Partitioner)
}
Constructors
-
Partition(Int) -
Partitioner(Partitioner)
pub type TimeStampType {
Undefined
Create
Append
}
Constructors
-
Undefined -
Create -
Append
pub type Value {
Value(value: BitArray, headers: List(#(String, String)))
ValueWithTimestamp(
value: BitArray,
timestamp: Int,
headers: List(#(String, String)),
)
}
Constructors
-
Value(value: BitArray, headers: List(#(String, String))) -
ValueWithTimestamp( value: BitArray, timestamp: Int, headers: List(#(String, String)), )
Functions
pub fn ack_return(cb_state: a) -> AckOrCommit(Ack)
pub fn commit_return(cb_state: a) -> AckOrCommit(Commit)
pub fn create_topic(
bootstrap_endpoints: List(#(String, Int)),
topic: String,
partitions: Int,
replication_factor: Int,
) -> Result(Nil, FranzError)
pub fn fetch(
client: FranzClient,
topic: String,
partition: Int,
offset: Int,
) -> Result(#(Int, KafkaMessage), FranzError)
pub fn produce(
client: FranzClient,
topic: String,
partition: ProducerPartition,
key: BitArray,
value: Value,
) -> Result(Nil, FranzError)
pub fn produce_cb(
client: FranzClient,
topic: String,
partition: ProducerPartition,
key: BitArray,
value: Value,
callback: fn(Int, Int) -> a,
) -> Result(Int, FranzError)
pub fn produce_sync(
client: FranzClient,
topic: String,
partition: ProducerPartition,
key: BitArray,
value: Value,
) -> Result(Nil, FranzError)
pub fn produce_sync_offset(
client: FranzClient,
topic: String,
partition: ProducerPartition,
key: BitArray,
value: Value,
) -> Result(Int, FranzError)
pub fn start_client(
bootstrap_endpoints: List(#(String, Int)),
client_config: List(ClientConfig),
) -> Result(FranzClient, FranzError)
pub fn start_consumer(
client: FranzClient,
topic: String,
options: List(ConsumerConfig),
) -> Result(Nil, FranzError)
pub fn start_group_subscriber(
client: FranzClient,
group_id: String,
topics: List(String),
consumer_config: List(ConsumerConfig),
group_config: List(GroupConfig),
message_type: MessageType,
callback: fn(KafkaMessage, a) -> AckOrCommit(b),
init_callback_state: c,
) -> Result(Pid, FranzError)
pub fn start_topic_subscriber(
client: FranzClient,
topic: String,
partitions: ConsumerPartition,
consumer_config: List(ConsumerConfig),
commited_offsets_by_partition: List(#(Int, Int)),
message_type: MessageType,
callback: fn(Int, KafkaMessage, a) -> AckOrCommit(Ack),
init_callback_state: a,
) -> Result(Pid, FranzError)
pub fn stop_client(client: FranzClient) -> Nil