TemporalSdk.Worker (temporal_sdk v0.2.11)
View SourceTask worker module.
The task worker is a supervised group of processes that manages processing of the Temporal task executions. Temporal task executions are managed by the Temporal server. Task worker processes are supervised by the SDK cluster supervisor.
Task worker responsibilities:
- configure and start the task worker statistics telemetry poller,
- configure worker-level fixed window rate limiter time windows,
- configure worker-level options, such as worker Temporal
:namespace, - configure Temporal task poller rate limiter,
- configure, start and supervise Temporal task pollers pool,
- configure and enforce rate limiters limits,
- setup and spawn task executors for Temporal task executions polled by the task pollers.
Task Worker Configuration
Summary
Functions
Returns the current dynamic configuration of the rate limiter.
Updates the dynamic configuration of the rate limiter.
Updates the dynamic configuration of the rate limiter on the Erlang nodes list.
Functions
@spec count( cluster :: :temporal_sdk_cluster.cluster_name(), worker_type :: :activity | :nexus | :workflow ) :: {:ok, non_neg_integer()} | :temporal_sdk_worker.invalid_error()
@spec get_limiter_config( cluster :: :temporal_sdk_cluster.cluster_name(), worker_type :: :temporal_sdk_worker.worker_type(), worker_id :: :temporal_sdk_worker.worker_id() ) :: {:ok, :temporal_sdk_worker.limiter_config()} | :temporal_sdk_worker.invalid_error()
Returns the current dynamic configuration of the rate limiter.
Example:
Elixir
iex(1)> TemporalSdk.Worker.start(:cluster_1, :activity, worker_id: "test_worker",
task_queue: "test_tq", task_poller_pool_size: 1,
limits: %{:worker => %{:activity_regular => {10, 600}}})
{:ok,
%{
task_queue: "test_tq",
worker_id: "test_worker",
task_poller_pool_size: 1,
limits: %{
node: %{},
os: %{},
worker: %{activity_regular: {10, 600}},
cluster: %{}
},
task_poller_limiter: %{limit: :infinity, time_window: :undefined},
limiter_check_frequency: 500,
task_settings: %{
data: :undefined,
last_heartbeat: [:undefined],
heartbeat_timeout_ratio: 0.8,
schedule_to_close_timeout_ratio: 0.8,
start_to_close_timeout_ratio: 0.8
},
namespace: ~c"default",
telemetry_poll_interval: 10000,
limiter_time_windows: %{activity_regular: 60000},
worker_version: %{},
temporal_name_to_erlang: &:temporal_sdk_api.temporal_name_to_erlang/2,
allowed_erlang_modules: :all,
allowed_temporal_names: :all
}}
iex(2)> TemporalSdk.Worker.get_limiter_config(:cluster_1, :activity, "test_worker")
{:ok,
%{
limits: %{
node: %{},
os: %{},
worker: %{activity_regular: {10, 600}},
cluster: %{}
},
task_poller_limiter: %{limit: :infinity, time_window: :undefined},
limiter_check_frequency: 500
}}Erlang
1> temporal_sdk_worker:start(cluster_1, activity, [{worker_id, "test_worker"},
{task_queue, "test_tq"}, {task_poller_pool_size, 1},
{limits, #{worker => #{activity_regular => {10, 600}}}}]).
{ok,#{namespace => "default",task_queue => "test_tq",
telemetry_poll_interval => 10000,
limiter_time_windows => #{activity_regular => 60000},
worker_version => #{},worker_id => "test_worker",
task_settings =>
#{data => undefined,
last_heartbeat => [undefined],
heartbeat_timeout_ratio => 0.8,
schedule_to_close_timeout_ratio => 0.8,
start_to_close_timeout_ratio => 0.8},
limits =>
#{node => #{},os => #{},
worker => #{activity_regular => {10,600}},
cluster => #{}},
task_poller_limiter =>
#{limit => infinity,time_window => undefined},
task_poller_pool_size => 1,limiter_check_frequency => 500,
temporal_name_to_erlang =>
fun temporal_sdk_api:temporal_name_to_erlang/2,
allowed_erlang_modules => all,
allowed_temporal_names => all}}
2> temporal_sdk_worker:get_limiter_config(cluster_1, activity, "test_worker").
{ok,#{limits =>
#{node => #{},os => #{},
worker => #{activity_regular => {10,600}},
cluster => #{}},
task_poller_limiter =>
#{limit => infinity,time_window => undefined},
limiter_check_frequency => 500}}
@spec is_started( cluster :: :temporal_sdk_cluster.cluster_name(), worker_type :: :temporal_sdk_worker.worker_type(), worker_id :: :temporal_sdk_worker.worker_id() ) :: boolean()
@spec list( cluster :: :temporal_sdk_cluster.cluster_name(), worker_type :: :activity | :nexus | :workflow ) :: {:ok, [:temporal_sdk_worker.worker_id()]} | :temporal_sdk_worker.invalid_error()
@spec options( cluster :: :temporal_sdk_cluster.cluster_name(), worker_type :: :temporal_sdk_worker.worker_type(), worker_id :: :temporal_sdk_worker.worker_id() ) :: {:ok, :temporal_sdk_worker.opts()} | :temporal_sdk_worker.invalid_error()
@spec set_limiter_config( cluster :: :temporal_sdk_cluster.cluster_name(), worker_type :: :temporal_sdk_worker.worker_type(), worker_id :: :temporal_sdk_worker.worker_id(), new_limiter_config :: :temporal_sdk_worker.limiter_config() | :temporal_sdk_worker.limiter_config_as_list() ) :: :temporal_sdk_worker.set_limiter_config_ret()
Updates the dynamic configuration of the rate limiter.
Default values for the :limits limiter levels are set to %{}, which means that setting
:limits to %{} will reset all rate limiter concurrency and fixed window limits.
Task worker will not start new tasks until the new rate limiter limits, defined in the updated limiter configuration, are satisfied. For example, if the number of currently ongoing task executions exceeds the concurrency limits set by the new rate limiter configuration, new tasks will not be started until the overflow task executions are drained to meet the new limits.
SDK Samples Rate Limiter example demonstrates function use.
Example:
Elixir
iex(1)> TemporalSdk.Worker.start(:cluster_1, :activity, worker_id: "test_worker",
task_queue: "test_tq", task_poller_pool_size: 1,
limits: %{:worker => %{:activity_regular => {10, 600}}})
{:ok,
%{
task_queue: "test_tq",
temporal_name_to_erlang: &:temporal_sdk_api.temporal_name_to_erlang/2,
namespace: ~c"default",
worker_version: %{},
task_settings: %{
data: :undefined,
last_heartbeat: [:undefined],
start_to_close_timeout_ratio: 0.8,
schedule_to_close_timeout_ratio: 0.8,
heartbeat_timeout_ratio: 0.8
},
worker_id: "test_worker",
allowed_temporal_names: :all,
allowed_erlang_modules: :all,
limiter_time_windows: %{activity_regular: 60000},
telemetry_poll_interval: 10000,
limits: %{
node: %{},
os: %{},
worker: %{activity_regular: {10, 600}},
cluster: %{}
},
task_poller_pool_size: 1,
task_poller_limiter: %{limit: :infinity, time_window: :undefined},
limiter_check_frequency: 500
}}
iex(2)> TemporalSdk.Worker.set_limiter_config(:cluster_1, :activity, "test_worker",
limits: %{:worker => %{:activity_regular => {1, 60}}},
task_poller_limiter: %{:limit => 60, :time_window => {1, :minute}})
:ok
iex(3)> TemporalSdk.Worker.get_limiter_config(:cluster_1, :activity, "test_worker")
{:ok,
%{
limits: %{
node: %{},
os: %{},
worker: %{activity_regular: {1, 60}},
cluster: %{}
},
task_poller_limiter: %{limit: 60, time_window: {1, :minute}},
limiter_check_frequency: 500
}}
iex(4)> TemporalSdk.Worker.set_limiter_config(:cluster_1, :activity, "test_worker",
limits: %{})
:ok
iex(5)> TemporalSdk.Worker.get_limiter_config(:cluster_1, :activity, "test_worker")
{:ok,
%{
limits: %{node: %{}, os: %{}, worker: %{}, cluster: %{}},
task_poller_limiter: %{limit: 60, time_window: {1, :minute}},
limiter_check_frequency: 500
}}Erlang
1> temporal_sdk_worker:start(cluster_1, activity, [{worker_id, "test_worker"},
{task_queue, "test_tq"}, {task_poller_pool_size, 1},
{limits, #{worker => #{activity_regular => {10, 600}}}}]).
{ok,#{namespace => "default",worker_id => "test_worker",
task_queue => "test_tq",
task_settings =>
#{data => undefined,
last_heartbeat => [undefined],
heartbeat_timeout_ratio => 0.8,
schedule_to_close_timeout_ratio => 0.8,
start_to_close_timeout_ratio => 0.8},
worker_version => #{},allowed_temporal_names => all,
allowed_erlang_modules => all,
temporal_name_to_erlang =>
fun temporal_sdk_api:temporal_name_to_erlang/2,
task_poller_pool_size => 1,
task_poller_limiter =>
#{limit => infinity,time_window => undefined},
limits =>
#{node => #{},os => #{},
worker => #{activity_regular => {10,600}},
cluster => #{}},
limiter_check_frequency => 500,
limiter_time_windows => #{activity_regular => 60000},
telemetry_poll_interval => 10000}}
2> temporal_sdk_worker:set_limiter_config(cluster_1, activity, "test_worker",
[{limits, #{worker => #{activity_regular => {1, 60}}}},
{task_poller_limiter, #{limit => 60, time_window => {1, minute}}}]).
ok
3> temporal_sdk_worker:get_limiter_config(cluster_1, activity, "test_worker").
{ok,#{task_poller_limiter =>
#{limit => 60,time_window => {1,minute}},
limits =>
#{node => #{},os => #{},
worker => #{activity_regular => {1,60}},
cluster => #{}},
limiter_check_frequency => 500}}
4> temporal_sdk_worker:set_limiter_config(cluster_1, activity, "test_worker", [{limits, #{}}]).
ok
5> temporal_sdk_worker:get_limiter_config(cluster_1, activity, "test_worker").
{ok,#{task_poller_limiter =>
#{limit => 60,time_window => {1,minute}},
limits =>
#{node => #{},os => #{},worker => #{},cluster => #{}},
limiter_check_frequency => 500}}
@spec set_limiter_config( cluster :: :temporal_sdk_cluster.cluster_name(), worker_type :: :temporal_sdk_worker.worker_type(), worker_id :: :temporal_sdk_worker.worker_id(), new_limiter_config :: :temporal_sdk_worker.limiter_config() | :temporal_sdk_worker.limiter_config_as_list(), nodes :: [node()] ) :: :ok | [ {:ok, :temporal_sdk_worker.set_limiter_config_ret()} | {:error, {:erpc, reason :: term()}} | term() ]
Updates the dynamic configuration of the rate limiter on the Erlang nodes list.
Function applies set_limiter_config/4 on the Erlang nodes provided as a list using
:erpc.multicall/4.
Returns :ok if the multiple call configuration update operation was successful on all nodes.
Returns an :erpc.multicall/4 formatted error if the call operation fails on any of the nodes.
Successful example:
Elixir
iex(a@host)1> n = Node.list() ++ [Node.self()]
[:b@host, :a@host]
iex(a@host)2> TemporalSdk.Worker.set_limiter_config(:cluster_1, :activity, "worker",
[{:limits, %{}}], n)
:okErlang
(a@host)1> N = nodes() ++ [node()].
[b@host,a@host]
(a@host)2> temporal_sdk_worker:set_limiter_config(cluster_1, activity, "worker", #{limits => #{}}, N).
okExample with 3 nodes, where the first node update is successful, the second node does not have a worker started, and the third node is not accessible:
Elixir
iex(a@host)1> n = [:b@host, :a@host, :c@host]
[:b@host, :a@host, :c@host]
iex(a@host)2> TemporalSdk.Worker.set_limiter_config(:cluster_1, :activity, "worker",
[{:limits, %{}}], n)
[ok: :ok, ok: :invalid_worker, error: {:erpc, :noconnection}]Erlang
(a@host)1> N = [b@host, a@host, c@host].
[b@host,a@host,c@host]
(a@host)3> temporal_sdk_worker:set_limiter_config(cluster_1, activity, "worker", #{limits => #{}}, N).
[{ok,ok},{ok,invalid_worker},{error,{erpc,noconnection}}]
@spec start( cluster :: :temporal_sdk_cluster.cluster_name(), worker_type :: :activity | :nexus | :workflow, worker_opts :: :temporal_sdk_worker.opts() | :temporal_sdk_worker.opts_as_list() ) :: {:ok, :temporal_sdk_worker.opts()} | {:invalid_opts, map()} | :temporal_sdk_worker.invalid_error() | :supervisor.startchild_ret()
@spec start( cluster :: :temporal_sdk_cluster.cluster_name(), worker_type :: :activity | :nexus | :workflow, worker_opts :: :temporal_sdk_worker.opts() | :temporal_sdk_worker.opts_as_list(), nodes :: [node()] ) :: :ok
@spec stats( cluster :: :temporal_sdk_cluster.cluster_name(), worker_type :: :temporal_sdk_worker.worker_type(), worker_id :: :temporal_sdk_worker.worker_id() ) :: {:ok, :temporal_sdk_limiter.stats()} | :temporal_sdk_worker.invalid_error()
@spec terminate( cluster :: :temporal_sdk_cluster.cluster_name(), worker_type :: :activity | :nexus | :workflow, worker_id :: :temporal_sdk_worker.worker_id() ) :: :ok | {:error, :invalid_cluster | :not_found | :simple_one_for_one}
@spec terminate( cluster :: :temporal_sdk_cluster.cluster_name(), worker_type :: :activity | :nexus | :workflow, worker_id :: :temporal_sdk_worker.worker_id(), nodes :: [node()] ) :: :ok