View Source EventStreamex.Operators.Executor (EventStreamex v1.3.1)

Executes the operator tasks when an event is received.

A new executor is started by the EventStreamex.Operators.Scheduler as soon as an event is received. It is responsible to start the operator the scheduler gave it to handle the new event. And it will handle the operator's failures.

Meaning that if the operator crashes or returns an error, the executor will restart until the operator successfully finishes its task or has failed to many times.

Restart strategy

In the configuration you can set the restart strategy:

config :event_streamex,
  operator_queue_backoff_multiplicator: 2,
  operator_queue_max_restart_time: 10000,
  operator_queue_max_retries: 5,
  operator_queue_min_restart_time: 500
  • operator_queue_backoff_multiplicator: At each new retry, the last wait time is multiplied by this multiplicator to get the new time (ie: If it waited 1.5 second for the last retry and the multiplicator is 2, then, it will now wait 3 seconds before the next restart).
  • operator_queue_max_restart_time: The time we wait before a restart is maxed at this value.
  • operator_queue_max_retries: The maximum number of times we allow the operators to retry after a failure.
  • operator_queue_min_restart_time: The minimum time we will wait before a restart.

At each failure of an operator, the error logger is called (EventStreamex.Operators.Logger.ErrorLoggerAdapter) as well as a telemetry.

When an operator exceeds the maximum number of restarts the executor crashes as well as the EventStreamex.Operators.Scheduler and no more event will be processed until you restart the scheduler with EventStreamex.restart_scheduler/0.

This is because the order of the events is very important and we cannot allow an event to fail, otherwise it would compromise your data.

You can create your own EventStreamex.Operators.Logger.ErrorLoggerAdapter and add it to the configuration:

config :event_streamex,
  error_logger_adapter: {EventStreamex.Operators.Logger.LoggerAdapter, []}

Doing so, you could implement some kind of notification system when there is a failure to warn your team about it.

When a failure happens, maybe it's because of a temporary failure and you could just restart the scheduler. But maybe the failure is due to a coding failure. In this case you should correct the issue, ship a new version of your app and restart the scheduler (or the entire application).

With a database queue adapter (the default one), the queue is saved to database and will be loaded again when the application restarts. You won't miss any event.

The executor is used internally, you should never call it directly



Returns a specification to start this module under a supervisor.


Link to this function


View Source (since 1.0.0)

Returns a specification to start this module under a supervisor.

See Supervisor.