View Source erlmld_worker behaviour (erlmld v1.1.1)

KCL MultiLangDaemon worker (record processor) behavior.

A worker has the following lifecycle:


When a shard lease has been obtained, a worker is initialized to process records appearing on that shard. It is provided the opaque data which was supplied to erlmld_sup, the shard name, and initial sequence number(s) (may be undefined for new shards or if using V1 protocol), and returns an opaque worker_state() value which is passed to process_records/2 and shutdown/2.

As records are read from the stream, they are b64decoded and passed to process_record/2. If a record was put on the stream using KPL aggregation, it is also deaggregated, with each sub-record provided to the worker as a single record along with a subsequence number.

After processing each record, a worker returns an updated worker_state(). It may also return a checkpoint() (not necessarily the latest) containing a sequence_number() from that record or a previous record, which will result in an attempt to checkpoint the stream at the associated sequence number. If the supplied checkpoint() has an undefined sequence number, the stream is checkpointed at the most recent sequence number.

Before starting to process each batch of records, a worker's ready/1 callback is called, which should return a possibly-updated worker state and possibly a checkpoint. This can be useful when a record processor is using a watchdog timer and is far behind on a stream (and so won't receive any actual records for a while), or if a stream has very low volume (records seen less frequently than desired checkpoint or flush intervals).

When a shard lease has been lost or a shard has been completely processed, a worker will be shut down. If the lease was lost, the worker will receive a reason of 'zombie', and it should not checkpoint (and any checkpoint response is in error). If the shard was closed, the reason will be 'terminate' and the worker should return a checkpoint response. That checkpoint should either have an undefined sequence number, or it should be the most recent sequence number which was provided to process_record/2.

If a worker returns an error response, it is fatal.

See also:

Link to this section Summary

Link to this section Types

-type checkpoint() :: #checkpoint{}.
-type sequence_number() :: #sequence_number{}.
-type shard_id() :: binary().
-type shutdown_reason() :: terminate | zombie.
-type stream_record() :: #stream_record{}.
-type worker_state() :: term().

Link to this section Callbacks

-callback checkpointed(worker_state(), sequence_number(), checkpoint()) ->
                {ok, worker_state()} | {error, term()}.
-callback initialize(term(), shard_id(), sequence_number() | undefined) ->
              {ok, worker_state()} | {error, term()}.
-callback process_record(worker_state(), stream_record()) ->
                  {ok, worker_state()} | {ok, worker_state(), checkpoint()} | {error, term()}.
-callback ready(worker_state()) ->
         {ok, worker_state()} | {ok, worker_state(), checkpoint()} | {error, term()}.
-callback shutdown(worker_state(), shutdown_reason()) -> ok | {ok, checkpoint()} | {error, term()}.

Link to this section Functions

Link to this function

checkpointed(Mod, WorkerState, SeqNumber, Checkpoint)

View Source
-spec checkpointed(module(), worker_state(), sequence_number(), checkpoint()) ->
                {ok, worker_state()} | {error, term()}.
Link to this function

initialize(Mod, HandlerData, ShardId, ISN)

View Source
-spec initialize(module(), term(), shard_id(), sequence_number() | undefined) ->
              {ok, worker_state()} | {error, term()}.
Link to this function

process_record(Mod, WorkerState, Record)

View Source
-spec process_record(module(), worker_state(), stream_record()) ->
                  {ok, worker_state()} | {ok, worker_state(), checkpoint()} | {error, term()}.
-spec ready(module(), worker_state()) ->
         {ok, worker_state()} | {ok, worker_state(), checkpoint()} | {error, term()}.
Link to this function

shutdown(Mod, WorkerState, Reason)

View Source
-spec shutdown(module(), worker_state(), shutdown_reason()) -> ok | {ok, checkpoint()} | {error, term()}.