View Source erlmld_wrk_statem (erlmld v1.1.0)

TCP protocol handler implementing the MultiLangDaemon protocol (V1 and V2).

The MultiLangDaemon protocol is a line-based request-response protocol with a JSON transport encoding and no pipelining. Requests contain a single newline. Each request must have a response.

Requests are of the form:

{ "action": ACTION-NAME, ... action-specific data } \n

Responses are of the form:


MLD V1 requests:

{ "action": "initialize", "shardId": SHARD-ID }

{ "action": "shutdown", "reason": REASON }

{ "action": "processRecords", "records": [{ "data": B64-DATA, % may contain KPL aggregate record data "partitionKey": PARTITION-KEY, "sequenceNumber": SEQNO }] }

MLD V2 requests:

{ "action": "initialize", "shardId": SHARD-ID, "sequenceNumber": SEQNO, "subSequenceNumber": SUBSEQNO }

{ "action": "shutdown", "reason": REASON }

{ "action": "processRecords", "millisBehindLatest": DELAY-MS, "records": [{ "action": "record", "data": B64-DATA, % does not contain KPL aggr record data "partitionKey": PARTITION-KEY, "approximateArrivalTimestamp": WRITE-TIME-MS % epoch utc ms when written to stream "sequenceNumber": SEQNO, "subSequenceNumber": SUBSEQNO }] }

{ "action": "shutdownRequested" }

Worker responses:

{ "action": "status", "responseFor": REQUEST-ACTION-NAME }

Worker V1 requests:

{ "action": "checkpoint", "checkpoint": SEQNO }

Worker V2 requests:

{ "action": "checkpoint", "sequenceNumber": SEQNO, "subSequenceNumber": SUBSEQNO }

MLD V1 responses:

{ "action": "checkpoint", "sequenceNumber": SEQNO, "error": ERROR }

MLD V2 responses:

{ "action": "checkpoint", "sequenceNumber": SEQNO, "subSequenceNumber": SUBSEQNO, "error": ERROR }

If checkpointing succeeded, ERROR will be null or absent, and the sequence number(s) will indicate which sequence/subsequence number was checkpointed. If checkpointing fails, a worker should probably exit.


- The MLD ignores blank lines (accepts \n JSON \n), and so do we.

- There is no way to indicate failure except by a worker exiting. The MLD doesn't currently handle worker failure, so if we exit, the MLD and all of its workers will die.

- We only emit checkpoint requests during processing of other requests, and before returning our responses for those requests. E.G., after a "processRecords" request or a non-ZOMBIE "shutdown" request, before returning the "status" response.

See also:

Link to this section Summary

Link to this section Types

-type checkpoint() :: #checkpoint{}.
-type data() :: #data{}.
-type sequence_number() :: #sequence_number{}.

Link to this section Functions