View Source Buffy.ThrottleAndTimed behaviour (Buffy v2.3.0)
This is a variation on the Buffy.Throttle
behavior.
It keeps the following functionality:
- wait for a specified amount of time before invoking the work function. If the function is called again before the time has elapsed, it's a no-op.
Key difference between Buffy.Throttle
and Buffy.ThrottleAndTimed
:
- it will not be terminated once the timer is done, but kept alive
- internally, the existing timer behavior is done via state rather than handling
{:error, {:already_started, pid}}
output ofGenServer.start_link
.- See note on Horde about state.
- internally, the existing timer behavior is done via state rather than handling
- it has an optional
:loop_interval
field value (set by config) to trigger work repeatedly based on a empty inbox timeout interval, that is based on GenServer's timeout feature. - allows for manipulating state for each
throttle
viadefoveridable
functions (see use case below)
Main reason for these changes is sometimes there's a need to fall back to a time-interval triggered work, when there aren't any triggers to start the work. Requirement of this means the process should exist and not get terminated immediately after a successfully throttled work execution.
In other words, we keep the throttle mechanism:
Once the timer has expired, the function will be called, and any subsequent calls will start a new timer.
call call call call call
| call | call | call | call |
| | | | | | | | |
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ Timer 1 │ │ Timer 2 │ │ Timer 3 │ │ Timer 4 │
└─────────| └─────────┘ └─────────┘ └─────────┘
| | | |
| | | Forth function invocation
| | Third function invocation
| Second function invocation
First function invocation
With the optionally enabled trigger, ending up in this lifecycle:
graph TB
A[Start Buffy] -->|start_link| B(Init Buffy)
B --> |initial handle_continue| W(Do throttled work)
S(Messages sent to Buffy) --> |message to trigger work| D{Can Throttle?}
D --> |YES| W
D --> |NO| C(Ignore message as throttle already scheduled)
S --> |empty inbox timeout interval| P(Do immediate work)
W --> |set message inbox timeout| S
P --> |set message inbox timeout| S
Note on usage with Horde
Under Horde, the state unfortunately doesn't get synced up automatically - that requires explicit tooling.
Therefore state will be "reset" to the initial state when process boots up. This is not a big issue as the initial state is to
set a throttled run of handle_throttle
.
How to start timed interval triggers when your application boots up
By design this will not run when your application starts. If there's a need to start the inbox timeout,
then create a child spec for the application Supervisor (typically in application.ex
)
for a Task module, that runs how many instances of throttle/1
as necessary.
Example implementation is:
# application.ex
def start(_type, _args) do
...
children = [
...
{true,
Supervisor.child_spec(
{Task,
fn ->
for x <- 1..10, do: MyModuleUsingThrottleAndTimed.throttle(some: "value", x: x)
end},
id: MyModuleUsingThrottleAndTimedInit,
restart: :temporary
)}
]
...
Example Usage
You'll first need to create a module that will be used to throttle.
defmodule MyTask do
use Buffy.ThrottleAndTimed,
throttle: :timer.minutes(2),
loop_interval: :timer.minutes(2)
def handle_throttle(args) do
# Do something with args
end
end
Next, you can use the throttle/1
function with the registered module.
iex> MyTask.throttle(args)
:ok
Options
:throttle
(non_neg_integer
) - Optional. The amount of time to wait before invoking the function. This value is in milliseconds.:registry_module
(atom
) - Optional. A module that implements theRegistry
behaviour. If you are running in a distributed instance, you can set this value toHorde.Registry
. Defaults toRegistry
.:registry_name
(atom
) - Optional. The name of the registry to use. Defaults to the built in Buffy registry, but if you are running in a distributed instance you can set this value to a namedHorde.Registry
process. Defaults toBuffy.Registry
.:restart
(:permanent
|:temporary
|:transient
) - Optional. The restart strategy to use for the GenServer. Defaults to:temporary
.:supervisor_module
(atom
) - Optional. A module that implements theDynamicSupervisor
behaviour. If you are running in a distributed instance, you can set this value toHorde.DynamicSupervisor
. Defaults toDynamicSupervisor
.:supervisor_name
(atom
) - Optional. The name of the dynamic supervisor to use. Defaults to the built in Buffy dynamic supervisor, but if you are running in a distributed instance you can set this value to a namedHorde.DynamicSupervisor
process. Defaults toBuffy.DynamicSupervisor
.:loop_interval
(atom
) - Optional. The amount of time that this process will wait while inbox is empty until sending a:timeout
message (handle viahandle_info
). Resets if message comes in. In milliseconds. Without this, the module would function exactly likeBuffy.Throttle
.
Dynamic Options
Sometimes you want a different throttle value or loop interval value based on the arguments you pass in. To deal with this, there are optional functions you can implement in your throttle and timed module. These functions take in the arguments and will return the throttle and loop interval values. For example:
defmodule MyThrottler do
use Buffy.ThrottleAndTimed,
registry_module: Horde.Registry,
registry_name: MyApp.HordeRegistry,
supervisor_module: Horde.DynamicSupervisor,
supervisor_name: MyApp.HordeDynamicSupervisor,
throttle: :timer.minutes(2)
def get_loop_interval(args) do
case args do
%Cat{} -> :timer.minutes(2)
%Dog{} -> :timer.seconds(10)
_ -> 0
end
end
end
Example Usage:
Have throttle/1
add to data to state to process in handle_throttle/1
defmodule MyTimedSlowBucketingThrottler do
use Buffy.ThrottleAndTimed,
throttle: 100,
supervisor_module: DynamicSupervisor,
supervisor_name: MyDynamicSupervisor
def handle_throttle(%{test_pid: test_pid, values: values} = args) do
Process.sleep(200)
send(test_pid, {:ok, args, System.monotonic_time()})
values
end
def args_to_key(%{key: key}), do: key |> :erlang.term_to_binary() |> :erlang.phash2()
def update_args(%{values: values} = old_arg, %{values: new_values} = _new_arg)
when is_list(values) and is_list(new_values) do
%{old_arg | values: Enum.sort(values ++ new_values)}
end
def update_state_with_work_result(%{args: %{values: state_values} = args} = state, result) do
# because `handle_throttle()` runs in the `:continue` lifecycle of GenServer,
# inbox processing is paused until the logic completes. Inbox will continually get new messages,
# from calling `throttle()` and will be processed only after completion of `handle_throttle()`.
%{state | args: %{args | values: []}}
end
end
Using with Horde
If you are running Elixir in a cluster, you can utilize Horde
to only run one of your throttled functions at a time. To do this, you'll need to set the :registry_module
and :supervisor_module
options to Horde.Registry
and Horde.DynamicSupervisor
respectively. You'll also need to set the :registry_name
and :supervisor_name
options to the name of the Horde registry and dynamic supervisor you want to use.
defmodule MyThrottler do
use Buffy.ThrottleAndTimed,
registry_module: Horde.Registry,
registry_name: MyApp.HordeRegistry,
supervisor_module: Horde.DynamicSupervisor,
supervisor_name: MyApp.HordeDynamicSupervisor,
throttle: :timer.minutes(2),
loop_interval: :timer.minutes(10)
def handle_throttle(args) do
# Do something with args
end
end
Telemetry
These are the events that are called by the Buffy.ThrottleAndTimed
module:
[:buffy, :throttle, :throttle]
- Emitted when thethrottle/1
function is called.[:buffy, :throttle, :timeout]
- Emitted when inbox timeout is triggered.[:buffy, :throttle, :handle, :start]
- Emitted at the start of thehandle_throttle/1
function.[:buffy, :throttle, :handle, :stop]
- Emitted at the end of thehandle_throttle/1
function.[:buffy, :throttle, :handle, :exception]
- Emitted when an error is raised in thehandle_throttle/1
function.
All of these events will have the following metadata:
:args
- The arguments passed to thethrottle/1
function.:key
- A hash of the passed arguments used to deduplicate the throttled function.:module
- The module usingBuffy.ThrottleAndTimed
.
With the additional metadata for [:buffy, :throttle, :handle, :stop]
:
:result
- The return value of thehandle_throttle/1
function.
Memory Leaks
With any sort of debounce and Elixir processes, you need to be careful about handling too many processes, or having to much state in memory at the same time. If you handle large amounts of data there is a good chance you'll end up with high memory usage and possibly affect other parts of your system.
To help monitor this usage, Buffy has a telemetry metric that measures the Elixir process memory usage. If you summarize this metric you should get a good view into your buffy throttle processes.
summary("buffy.throttle.total_heap_size", tags: [:module])
Summary
Types
A list of arbitrary arguments that are used for the handle_throttle/1
function.
A unique key for debouncing. This is used for GenServer uniqueness and is generated from hashing all of the args.
The amount of time that this process will wait while inbox is empty
until sending a :timeout
message.
Internal state that Buffy.ThrottleAndTimed
keeps.
Callbacks
Generates a unique key for the given arguments.
Returns the amount of jitter in milliseconds to add to the throttle time.
Returns the amount of throttle time in milliseconds.
The function called after the throttle has completed. This function will
receive the arguments passed to the throttle/1
function.
A function to call the throttle. This will start
and wait the configured throttle
time before calling the handle_throttle/1
function.
Types
@type args() :: term()
A list of arbitrary arguments that are used for the handle_throttle/1
function.
@type key() :: term()
A unique key for debouncing. This is used for GenServer uniqueness and is generated from hashing all of the args.
@type loop_interval() :: non_neg_integer() | :infinity
The amount of time that this process will wait while inbox is empty
until sending a :timeout
message.
@type state() :: %{ args: args(), key: key(), loop_interval: loop_interval(), timer_ref: nil | reference() }
Internal state that Buffy.ThrottleAndTimed
keeps.
Callbacks
Generates a unique key for the given arguments.
@callback get_loop_interval(args()) :: loop_interval()
Returns the amount of jitter in milliseconds to add to the throttle time.
@callback get_throttle(args()) :: non_neg_integer()
Returns the amount of throttle time in milliseconds.
The function called after the throttle has completed. This function will
receive the arguments passed to the throttle/1
function.
A function to call the throttle. This will start
and wait the configured throttle
time before calling the handle_throttle/1
function.