View Source EventStreamex.Operators.Scheduler (EventStreamex v1.3.1)

Is responsible for starting operators as soon as an event is received.

When an event is received, the scheduler processes it by searching for any EventStreamex.Operators.Operator listening to it.

The it will start an EventStreamex.Operators.Executor who will be responsible for the completion of the operator.

If the operator fails too many times, then, the executor will also fail, as well as the scheduler.

If this happens that means that no more event will be processed. This is a security to avoid inconsistency in your data, as events must be processed in order.

If the scheduler fails, you will have to restart it yourself using EventStreamex.restart_scheduler().

But before you do that, ensure that the operator will not crash again. Maybe it will require some code changes. Or a system to be up and running again.

Do not hesitate to implement your own EventStreamex.Operators.Logger.ErrorLoggerAdapter to be notified why a crash appeared.

Summary

Functions

Returns a specification to start this module under a supervisor.

Gets the list of operators being executed with their status.

Gets the information of the given operator being executed or :no_job.

Returns the status of the scheduler.

Function used to process an event.

Functions

Link to this function

child_spec(init_arg)

View Source (since 1.0.0)

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

curr_job()

View Source (since 1.2.0)

Gets the list of operators being executed with their status.

The return value is like this: [{atom(), {pid(), ref(), boolean()}}]

Link to this function

curr_job(module)

View Source (since 1.2.0)

Gets the information of the given operator being executed or :no_job.

The return value is like this: {pid(), ref(), event(), boolean()}

Link to this function

is_alive?()

View Source (since 1.0.0)

Returns the status of the scheduler.

Link to this function

process_event(pid, event)

View Source (since 1.0.0)

Function used to process an event.

You must not use this function yourself

Event example:

%Walex.Event{
  name: :user,
  type: :update,
  source: %WalEx.Event.Source{
    name: "WalEx",
    version: "3.8.0",
    db: "todos",
    schema: "public",
    table: "user",
    columns: %{
      id: "integer",
      name: "varchar",
      created_at: "timestamptz"
    }
  },
  new_record: %{
    id: 1234,
    name: "Chase Pursley",
    created_at: #DateTime<2023-08-18 14:09:05.988369-04:00 -04 Etc/UTC-4>
  },
  # we don't show old_record for update to reduce payload size
  # however, you can see any old values that changed under "changes"
  old_record: nil,
  changes: %{
    name: %{
      new_value: "Chase Pursley",
      old_value: "Chase"
    }
  },
  timestamp: ~U[2023-12-18 15:50:08.329504Z]
}