View Source Datadog.DataStreams.Aggregator (Data Streams Ex v1.2.2)

A GenServer instance responsible for aggregating many points of data together into 10 second buckets, and then sending them to the Datadog agent. It holds many structs in its memory, looking something like this:

graph TD
    aggregator[Datadog.DataStreams.Aggregator]
    aggregator --> bucket[Datadog.DataStreams.Aggregator.Bucket]
    bucket --> group[Datadog.DataStreams.Aggregator.Group]

When adding data, the calling code will create a new Datadog.DataStreams.Aggregator.Point which contains all of the needed data. It will then call Elixir.Datadog.DataStreams.Aggregator.add/1 to add that point of data to the aggregator, where the aggregator will find (or create) a bucket that matches the 10 second window for the point. It will then find (or create) a group in that bucket based on the point's hash. Once the group is found, the pathway_latency and edge_latency Datadog.Sketch will be updated with the new latency.

Every 10 seconds the aggregator will convert all non active (outside the 10 second window) to a Datadog.DataStreams.Payload, encode it, and send it to the Datadog agent. If there is an error sending the payload, the old payloads are still removed from memory, but the datadog.datastreams.aggregator.flush_errors.count telemetry metric is incremented.

Summary

Functions

Adds new metrics to the aggregator.

Returns a specification to start this module under a supervisor.

Sends all stored data to the Datadog agent.

Starts a new Elixir.Datadog.DataStreams.Aggregator instance. This takes no options as it uses the global Datadog.DataStreams.Config module. It is also started by the Datadog.DataStreams.Application and should not need to be started manually.

Functions

@spec add(
  Datadog.DataStreams.Aggregator.Point.t()
  | Datadog.DataStreams.Aggregator.Offset.t()
) :: :ok

Adds new metrics to the aggregator.

Note, this function will still return :ok if the aggregator is disabled.

Examples

iex> :ok = Aggregator.add(%Aggregator.Point{})

iex> :ok = Aggregator.add(%Aggregator.Offset{})

Returns a specification to start this module under a supervisor.

See Supervisor.

@spec flush() :: :ok

Sends all stored data to the Datadog agent.

Examples

iex> :ok = Aggregator.flush()
@spec start_link(Keyword.t()) :: GenServer.on_start()

Starts a new Elixir.Datadog.DataStreams.Aggregator instance. This takes no options as it uses the global Datadog.DataStreams.Config module. It is also started by the Datadog.DataStreams.Application and should not need to be started manually.