temporal_sdk_worker (temporal_sdk v0.1.14)

View Source

Task worker module.

The task worker is a supervised group of processes that manages processing of the Temporal task executions. Temporal task executions are managed by the Temporal server. Task worker processes are supervised by the SDK cluster supervisor.

Task worker responsibilities:

  • configure and start the task worker statistics telemetry poller,
  • configure worker-level fixed window rate limiter time windows,
  • configure worker-level options, such as worker Temporal namespace,
  • configure Temporal task poller rate limiter,
  • configure, start and supervise Temporal task pollers pool,
  • configure and enforce rate limiters limits,
  • setup and spawn task executors for Temporal task executions polled by the task pollers.

Task Worker Configuration

Summary

Types

activity_settings()

-type activity_settings() ::
          #{data => temporal_sdk_activity:data(),
            last_heartbeat => temporal_sdk_activity:heartbeat(),
            heartbeat_timeout_ratio => float(),
            schedule_to_close_timeout_ratio => float(),
            start_to_close_timeout_ratio => float()}.

allowed_erlang_modules()

-type allowed_erlang_modules() :: all | [module()].

allowed_temporal_names()

-type allowed_temporal_names() :: all | [unicode:chardata()].

id()

-type id() :: atom() | unicode:chardata().

invalid_error()

-type invalid_error() :: {error, invalid_cluster | invalid_worker}.

limiter_config()

-type limiter_config() ::
          #{task_poller_limiter => task_poller_limiter(),
            limits => temporal_sdk_limiter:levels_limits(),
            limiter_check_frequency => pos_integer()}.

Dynamic configuration of the rate limiter.

Use get_limiter_config/3 to retrieve and set_limiter_config/4 to update the dynamic configuration of rate limiters. See start/3 for descriptions of the configuration options.

limiter_time_windows_activity()

-type limiter_time_windows_activity() :: #{activity_regular => temporal_sdk_limiter:time_window()}.

limiter_time_windows_nexus()

-type limiter_time_windows_nexus() :: #{nexus => temporal_sdk_limiter:time_window()}.

limiter_time_windows_session()

-type limiter_time_windows_session() :: #{activity_session => temporal_sdk_limiter:time_window()}.

limiter_time_windows_workflow()

-type limiter_time_windows_workflow() ::
          #{activity_direct => temporal_sdk_limiter:time_window(),
            activity_eager => temporal_sdk_limiter:time_window(),
            workflow => temporal_sdk_limiter:time_window()}.

nexus_settings()

-type nexus_settings() ::
          #{data => temporal_sdk_nexus:data(),
            task_timeout_ratio => float(),
            error_type => unicode:chardata()}.

opts()

-type opts() ::
          #{worker_id => worker_id(),
            namespace => unicode:chardata(),
            task_queue := unicode:chardata() | session_task_queue_name_fun(),
            task_settings => task_settings(),
            worker_version => worker_version(),
            allowed_temporal_names => allowed_temporal_names(),
            allowed_erlang_modules => allowed_erlang_modules(),
            temporal_name_to_erlang => temporal_name_to_erlang(),
            task_poller_pool_size => pos_integer(),
            task_poller_limiter => task_poller_limiter(),
            limits => temporal_sdk_limiter:levels_limits(),
            limiter_check_frequency => pos_integer(),
            limiter_time_windows =>
                limiter_time_windows_activity() |
                limiter_time_windows_workflow() |
                limiter_time_windows_session() |
                limiter_time_windows_nexus(),
            telemetry_poll_interval => temporal_sdk:time()}.

session_task_queue_name_fun()

-type session_task_queue_name_fun() ::
          fun((Cluster :: temporal_sdk_cluster:cluster_name(),
               Namespace :: unicode:chardata(),
               ParentTaskQueueName :: unicode:chardata()) ->
                  unicode:chardata()).

set_limiter_config_ret()

-type set_limiter_config_ret() :: ok | {error, {invalid_opts, map()}} | invalid_worker | invalid_state.

task_poller_limiter()

-type task_poller_limiter() ::
          #{limit := pos_integer() | infinity, time_window := temporal_sdk:time() | undefined}.

task_settings()

-type task_settings() :: activity_settings() | nexus_settings() | workflow_settings().

temporal_name_to_erlang()

-type temporal_name_to_erlang() ::
          fun((Cluster :: temporal_sdk_cluster:cluster_name(), TemporalTypeName :: unicode:chardata()) ->
                  {ok, module()} | {error, Reason :: term()}).

user_activity_settings()

-type user_activity_settings() ::
          [{data, temporal_sdk_activity:data()} |
           {last_heartbeat, temporal_sdk_activity:heartbeat()} |
           {heartbeat_timeout_ratio, float()} |
           {schedule_to_close_timeout_ratio, float()} |
           {start_to_close_timeout_ratio, float()}].

user_limiter_config()

-type user_limiter_config() ::
          [{task_poller_limiter, task_poller_limiter()} |
           {limits, temporal_sdk_limiter:levels_limits()} |
           {limiter_check_frequency, pos_integer()}].

user_limiter_time_windows_activity()

-type user_limiter_time_windows_activity() :: [{activity_regular, temporal_sdk_limiter:time_window()}].

user_limiter_time_windows_nexus()

-type user_limiter_time_windows_nexus() :: [{nexus, temporal_sdk_limiter:time_window()}].

user_limiter_time_windows_session()

-type user_limiter_time_windows_session() :: [{activity_session, temporal_sdk_limiter:time_window()}].

user_limiter_time_windows_workflow()

-type user_limiter_time_windows_workflow() ::
          [{activity_eager, temporal_sdk_limiter:time_window()} |
           {activity_regular, temporal_sdk_limiter:time_window()} |
           {workflow, temporal_sdk_limiter:time_window()}].

user_nexus_settings()

-type user_nexus_settings() ::
          [{data, temporal_sdk_nexus:data()} |
           {task_timeout_ratio, float()} |
           {error_type, unicode:chardata()}].

user_opts()

-type user_opts() ::
          [{worker_id, worker_id()} |
           {namespace, unicode:chardata()} |
           {task_queue, unicode:chardata() | session_task_queue_name_fun()} |
           {task_settings, task_settings() | user_task_settings()} |
           {worker_version, worker_version()} |
           {allowed_temporal_names, allowed_temporal_names()} |
           {allowed_erlang_modules, allowed_erlang_modules()} |
           {temporal_name_to_erlang, temporal_name_to_erlang()} |
           {task_poller_pool_size, pos_integer()} |
           {task_poller_limiter, task_poller_limiter()} |
           {limits, temporal_sdk_limiter:levels_limits() | temporal_sdk_limiter:user_levels_limits()} |
           {limiter_check_frequency, pos_integer()} |
           {limiter_time_windows,
            limiter_time_windows_activity() |
            limiter_time_windows_workflow() |
            limiter_time_windows_session() |
            limiter_time_windows_nexus() |
            user_limiter_time_windows_activity() |
            user_limiter_time_windows_workflow() |
            user_limiter_time_windows_session() |
            user_limiter_time_windows_nexus()} |
           {telemetry_poll_interval, temporal_sdk:time()}].

user_task_settings()

user_workflow_settings()

-type user_workflow_settings() ::
          [{execution_id, temporal_sdk_workflow:execution_id()} |
           {deterministic_check_mod, module()} |
           {run_timeout_ratio, float()} |
           {task_timeout_ratio, float()} |
           {sticky_execution_schedule_to_start_ratio, float()} |
           {maximum_page_size, pos_integer()} |
           {await_open_before_close, boolean()} |
           {otp_messages_limits,
            [{received, pos_integer() | infinity} |
             {recorded, pos_integer() | infinity} |
             {ignored, pos_integer() | infinity}]} |
           {eager_execution_settings, activity_settings() | user_activity_settings()} |
           {session_worker, opts() | user_opts() | boolean()}].

worker_id()

-type worker_id() :: atom() | unicode:chardata().

worker_type()

-type worker_type() :: activity | nexus | session | workflow.

worker_version()

workflow_settings()

-type workflow_settings() ::
          #{execution_id => temporal_sdk_workflow:execution_id(),
            deterministic_check_mod => module(),
            run_timeout_ratio => float(),
            task_timeout_ratio => float(),
            sticky_execution_schedule_to_start_ratio => float(),
            maximum_page_size => pos_integer(),
            await_open_before_close => boolean(),
            otp_messages_limits =>
                [{received, pos_integer() | infinity} |
                 {recorded, pos_integer() | infinity} |
                 {ignored, pos_integer() | infinity}],
            eager_execution_settings => activity_settings(),
            session_worker => opts() | boolean()}.

Functions

count(Cluster, WorkerType)

-spec count(Cluster :: temporal_sdk_cluster:cluster_name(), WorkerType :: activity | nexus | workflow) ->
               {ok, non_neg_integer()} | invalid_error().

get_limiter_config(Cluster, WorkerType, WorkerId)

-spec get_limiter_config(Cluster :: temporal_sdk_cluster:cluster_name(),
                         WorkerType :: worker_type(),
                         WorkerId :: worker_id()) ->
                            {ok, limiter_config()} | invalid_error().

Returns the current dynamic configuration of the rate limiter.

Example:

Elixir

iex(1)> TemporalSdk.Worker.start(:cluster_1, :activity, worker_id: "test_worker",
        task_queue: "test_tq", task_poller_pool_size: 1,
        limits: %{:worker => %{:activity_regular => {10, 600}}})
{:ok,
 %{
   task_queue: "test_tq",
   worker_id: "test_worker",
   task_poller_pool_size: 1,
   limits: %{
     node: %{},
     os: %{},
     worker: %{activity_regular: {10, 600}},
     cluster: %{}
   },
   task_poller_limiter: %{limit: :infinity, time_window: :undefined},
   limiter_check_frequency: 500,
   task_settings: %{
     data: :undefined,
     last_heartbeat: [:undefined],
     heartbeat_timeout_ratio: 0.8,
     schedule_to_close_timeout_ratio: 0.8,
     start_to_close_timeout_ratio: 0.8
   },
   namespace: ~c"default",
   telemetry_poll_interval: 10000,
   limiter_time_windows: %{activity_regular: 60000},
   worker_version: %{},
   temporal_name_to_erlang: &:temporal_sdk_api.temporal_name_to_erlang/2,
   allowed_erlang_modules: :all,
   allowed_temporal_names: :all
 }}
iex(2)> TemporalSdk.Worker.get_limiter_config(:cluster_1, :activity, "test_worker")
{:ok,
 %{
   limits: %{
     node: %{},
     os: %{},
     worker: %{activity_regular: {10, 600}},
     cluster: %{}
   },
   task_poller_limiter: %{limit: :infinity, time_window: :undefined},
   limiter_check_frequency: 500
 }}

Erlang

1> temporal_sdk_worker:start(cluster_1, activity, [{worker_id, "test_worker"},
   {task_queue, "test_tq"}, {task_poller_pool_size, 1},
   {limits, #{worker => #{activity_regular => {10, 600}}}}]).
{ok,#{namespace => "default",task_queue => "test_tq",
      telemetry_poll_interval => 10000,
      limiter_time_windows => #{activity_regular => 60000},
      worker_version => #{},worker_id => "test_worker",
      task_settings =>
          #{data => undefined,
            last_heartbeat => [undefined],
            heartbeat_timeout_ratio => 0.8,
            schedule_to_close_timeout_ratio => 0.8,
            start_to_close_timeout_ratio => 0.8},
      limits =>
          #{node => #{},os => #{},
            worker => #{activity_regular => {10,600}},
            cluster => #{}},
      task_poller_limiter =>
          #{limit => infinity,time_window => undefined},
      task_poller_pool_size => 1,limiter_check_frequency => 500,
      temporal_name_to_erlang =>
          fun temporal_sdk_api:temporal_name_to_erlang/2,
      allowed_erlang_modules => all,
      allowed_temporal_names => all}}
2> temporal_sdk_worker:get_limiter_config(cluster_1, activity, "test_worker").
{ok,#{limits =>
          #{node => #{},os => #{},
            worker => #{activity_regular => {10,600}},
            cluster => #{}},
      task_poller_limiter =>
          #{limit => infinity,time_window => undefined},
      limiter_check_frequency => 500}}

is_started(Cluster, WorkerType, WorkerId)

-spec is_started(Cluster :: temporal_sdk_cluster:cluster_name(),
                 WorkerType :: worker_type(),
                 WorkerId :: worker_id()) ->
                    boolean().

list(Cluster, WorkerType)

-spec list(Cluster :: temporal_sdk_cluster:cluster_name(), WorkerType :: activity | nexus | workflow) ->
              {ok, [worker_id()]} | invalid_error().

options(Cluster, WorkerType, WorkerId)

-spec options(Cluster :: temporal_sdk_cluster:cluster_name(),
              WorkerType :: worker_type(),
              WorkerId :: worker_id()) ->
                 {ok, opts()} | invalid_error().

set_limiter_config(Cluster, WorkerType, WorkerId, NewLimiterConfig)

-spec set_limiter_config(Cluster :: temporal_sdk_cluster:cluster_name(),
                         WorkerType :: worker_type(),
                         WorkerId :: worker_id(),
                         NewLimiterConfig :: limiter_config() | user_limiter_config()) ->
                            set_limiter_config_ret().

Updates the dynamic configuration of the rate limiter.

Default values for the limits limiter levels are set to #{}, which means that setting limits => #{} will reset all rate limiter concurrency and fixed window limits.

Example:

Elixir

iex(1)> TemporalSdk.Worker.start(:cluster_1, :activity, worker_id: "test_worker",
        task_queue: "test_tq", task_poller_pool_size: 1,
        limits: %{:worker => %{:activity_regular => {10, 600}}})
{:ok,
 %{
   task_queue: "test_tq",
   temporal_name_to_erlang: &:temporal_sdk_api.temporal_name_to_erlang/2,
   namespace: ~c"default",
   worker_version: %{},
   task_settings: %{
     data: :undefined,
     last_heartbeat: [:undefined],
     start_to_close_timeout_ratio: 0.8,
     schedule_to_close_timeout_ratio: 0.8,
     heartbeat_timeout_ratio: 0.8
   },
   worker_id: "test_worker",
   allowed_temporal_names: :all,
   allowed_erlang_modules: :all,
   limiter_time_windows: %{activity_regular: 60000},
   telemetry_poll_interval: 10000,
   limits: %{
     node: %{},
     os: %{},
     worker: %{activity_regular: {10, 600}},
     cluster: %{}
   },
   task_poller_pool_size: 1,
   task_poller_limiter: %{limit: :infinity, time_window: :undefined},
   limiter_check_frequency: 500
 }}
iex(2)> TemporalSdk.Worker.set_limiter_config(:cluster_1, :activity, "test_worker",
        limits: %{:worker => %{:activity_regular => {1, 60}}},
        task_poller_limiter: %{:limit => 60, :time_window => {1, :minute}})
:ok
iex(3)> TemporalSdk.Worker.get_limiter_config(:cluster_1, :activity, "test_worker")
{:ok,
 %{
   limits: %{
     node: %{},
     os: %{},
     worker: %{activity_regular: {1, 60}},
     cluster: %{}
   },
   task_poller_limiter: %{limit: 60, time_window: {1, :minute}},
   limiter_check_frequency: 500
 }}
iex(4)> TemporalSdk.Worker.set_limiter_config(:cluster_1, :activity, "test_worker",
        limits: %{})
:ok
iex(5)> TemporalSdk.Worker.get_limiter_config(:cluster_1, :activity, "test_worker")
{:ok,
 %{
   limits: %{node: %{}, os: %{}, worker: %{}, cluster: %{}},
   task_poller_limiter: %{limit: 60, time_window: {1, :minute}},
   limiter_check_frequency: 500
 }}

Erlang

1> temporal_sdk_worker:start(cluster_1, activity, [{worker_id, "test_worker"},
   {task_queue, "test_tq"}, {task_poller_pool_size, 1},
   {limits, #{worker => #{activity_regular => {10, 600}}}}]).
{ok,#{namespace => "default",worker_id => "test_worker",
      task_queue => "test_tq",
      task_settings =>
          #{data => undefined,
            last_heartbeat => [undefined],
            heartbeat_timeout_ratio => 0.8,
            schedule_to_close_timeout_ratio => 0.8,
            start_to_close_timeout_ratio => 0.8},
      worker_version => #{},allowed_temporal_names => all,
      allowed_erlang_modules => all,
      temporal_name_to_erlang =>
          fun temporal_sdk_api:temporal_name_to_erlang/2,
      task_poller_pool_size => 1,
      task_poller_limiter =>
          #{limit => infinity,time_window => undefined},
      limits =>
          #{node => #{},os => #{},
            worker => #{activity_regular => {10,600}},
            cluster => #{}},
      limiter_check_frequency => 500,
      limiter_time_windows => #{activity_regular => 60000},
      telemetry_poll_interval => 10000}}
2> temporal_sdk_worker:set_limiter_config(cluster_1, activity, "test_worker",
   [{limits, #{worker => #{activity_regular => {1, 60}}}},
   {task_poller_limiter, #{limit => 60, time_window => {1, minute}}}]).
ok
3> temporal_sdk_worker:get_limiter_config(cluster_1, activity, "test_worker").
{ok,#{task_poller_limiter =>
          #{limit => 60,time_window => {1,minute}},
      limits =>
          #{node => #{},os => #{},
            worker => #{activity_regular => {1,60}},
            cluster => #{}},
      limiter_check_frequency => 500}}
4> temporal_sdk_worker:set_limiter_config(cluster_1, activity, "test_worker", [{limits, #{}}]).
ok
5> temporal_sdk_worker:get_limiter_config(cluster_1, activity, "test_worker").
{ok,#{task_poller_limiter =>
          #{limit => 60,time_window => {1,minute}},
      limits =>
          #{node => #{},os => #{},worker => #{},cluster => #{}},
      limiter_check_frequency => 500}}

set_limiter_config(Cluster, WorkerType, WorkerId, NewLimiterConfig, Nodes)

-spec set_limiter_config(Cluster :: temporal_sdk_cluster:cluster_name(),
                         WorkerType :: worker_type(),
                         WorkerId :: worker_id(),
                         NewLimiterConfig :: limiter_config() | user_limiter_config(),
                         Nodes :: [node()]) ->
                            ok |
                            [{ok, set_limiter_config_ret()} | {error, {erpc, Reason :: term()}} | term()].

Updates the dynamic configuration of the rate limiter on the Erlang nodes list.

Function applies set_limiter_config/4 on the Erlang nodes provided as a list using erpc:multicall/4. Returns ok if the multiple call configuration update operation was successful on all nodes. Returns an erpc:multicall/4 formatted error if the call operation fails on any of the nodes.

Successful example:

Elixir

iex(a@host)1> n = Node.list() ++ [Node.self()]
[:b@host, :a@host]
iex(a@host)2> TemporalSdk.Worker.set_limiter_config(:cluster_1, :activity, "worker",
              [{:limits, %{}}], n)
:ok

Erlang

(a@host)1> N = nodes() ++ [node()].
[b@host,a@host]
(a@host)2> temporal_sdk_worker:set_limiter_config(cluster_1, activity, "worker", #{limits => #{}}, N).
ok

Example with 3 nodes, where the first node update is successful, the second node does not have a worker started, and the third node is not accessible:

Elixir

iex(a@host)1> n = [:b@host, :a@host, :c@host]
[:b@host, :a@host, :c@host]
iex(a@host)2> TemporalSdk.Worker.set_limiter_config(:cluster_1, :activity, "worker",
              [{:limits, %{}}], n)
[ok: :ok, ok: :invalid_worker, error: {:erpc, :noconnection}]

Erlang

(a@host)1> N = [b@host, a@host, c@host].
[b@host,a@host,c@host]
(a@host)3> temporal_sdk_worker:set_limiter_config(cluster_1, activity, "worker", #{limits => #{}}, N).
[{ok,ok},{ok,invalid_worker},{error,{erpc,noconnection}}]

start(Cluster, WorkerType, WorkerOpts)

-spec start(Cluster :: temporal_sdk_cluster:cluster_name(),
            WorkerType :: activity | nexus | workflow,
            WorkerOpts :: user_opts() | opts()) ->
               {ok, opts()} | {invalid_opts, map()} | invalid_error() | supervisor:startchild_ret().

start(Cluster, WorkerType, WorkerOpts, Nodes)

-spec start(Cluster :: temporal_sdk_cluster:cluster_name(),
            WorkerType :: activity | nexus | workflow,
            WorkerOpts :: user_opts() | opts(),
            Nodes :: [node()]) ->
               ok.

stats(Cluster, WorkerType, WorkerId)

-spec stats(Cluster :: temporal_sdk_cluster:cluster_name(),
            WorkerType :: worker_type(),
            WorkerId :: worker_id()) ->
               {ok, temporal_sdk_limiter:stats()} | invalid_error().

terminate(Cluster, WorkerType, WorkerId)

-spec terminate(Cluster :: temporal_sdk_cluster:cluster_name(),
                WorkerType :: activity | nexus | workflow,
                WorkerId :: worker_id()) ->
                   ok | {error, invalid_cluster | not_found | simple_one_for_one}.

terminate(Cluster, WorkerType, WorkerId, Nodes)

-spec terminate(Cluster :: temporal_sdk_cluster:cluster_name(),
                WorkerType :: activity | nexus | workflow,
                WorkerId :: worker_id(),
                Nodes :: [node()]) ->
                   ok.