View Source erlmld_batch_processor (erlmld v1.1.0)

Generic record processor with support for batching and checkpointing.

We use an underlying 'erlmld_flusher' module to maintain the current batch. When the current batch is full (according to the flusher's definition of fullness), or when 'flush_interval_ms' milliseconds have elapsed, we ask the flusher to flush the current batch and start a new one.

We checkpoint every 'checkpoint_interval_ms' milliseconds. After 'watchdog_timeout_ms' milliseconds with no activity (after any initial activity), the current process is killed. Checkpointing works by keeping track of the latest record for which itself and all predecessors have been flushed. For example, suppose 'process_record' has been called with the following records, and the flusher has already flushed the ones marked with '*':

R1 R2 R3 R4 R5 R6 R7 R8 R9 R10 * * * * * *

Then the stream will be checkpointed at (the sequence number of) R3. For the internal mechanics, see the comment in 'note_success'.

Link to this section Summary

Link to this section Functions

Link to this function

checkpointed(State, SequenceNumber, Checkpoint)

View Source
Link to this function

initialize(Opts, ShardId, ISN)

View Source
Link to this function

process_record(State, Record)

View Source