View Source EventStreamex.Operators.Scheduler (EventStreamex v1.3.1)
Is responsible for starting operators as soon as an event is received.
When an event is received, the scheduler processes it by searching for
any EventStreamex.Operators.Operator
listening to it.
The it will start an EventStreamex.Operators.Executor
who will be responsible
for the completion of the operator.
If the operator fails too many times, then, the executor will also fail, as well as the scheduler.
If this happens that means that no more event will be processed. This is a security to avoid inconsistency in your data, as events must be processed in order.
If the scheduler fails, you will have to restart it yourself using EventStreamex.restart_scheduler()
.
But before you do that, ensure that the operator will not crash again. Maybe it will require some code changes. Or a system to be up and running again.
Do not hesitate to implement your own EventStreamex.Operators.Logger.ErrorLoggerAdapter
to be notified why a crash appeared.
Summary
Functions
Returns a specification to start this module under a supervisor.
Gets the list of operators being executed with their status.
Gets the information of the given operator being executed or :no_job
.
Returns the status of the scheduler.
Function used to process an event.
Functions
Returns a specification to start this module under a supervisor.
See Supervisor
.
Gets the list of operators being executed with their status.
The return value is like this: [{atom(), {pid(), ref(), boolean()}}]
Gets the information of the given operator being executed or :no_job
.
The return value is like this: {pid(), ref(), event(), boolean()}
Returns the status of the scheduler.
Function used to process an event.
You must not use this function yourself
Event example:
%Walex.Event{
name: :user,
type: :update,
source: %WalEx.Event.Source{
name: "WalEx",
version: "3.8.0",
db: "todos",
schema: "public",
table: "user",
columns: %{
id: "integer",
name: "varchar",
created_at: "timestamptz"
}
},
new_record: %{
id: 1234,
name: "Chase Pursley",
created_at: #DateTime<2023-08-18 14:09:05.988369-04:00 -04 Etc/UTC-4>
},
# we don't show old_record for update to reduce payload size
# however, you can see any old values that changed under "changes"
old_record: nil,
changes: %{
name: %{
new_value: "Chase Pursley",
old_value: "Chase"
}
},
timestamp: ~U[2023-12-18 15:50:08.329504Z]
}