temporal_sdk (temporal_sdk v0.2.11)
View SourceCommon Temporal commands and SDK utility functions module.
Summary
Utility functions
Evicts workflow execution.
Types
-type application_failure() :: #{source => serializable(), message => serializable(), stack_trace => serializable(), encoded_attributes => term_to_payload(), type => unicode:chardata(), non_retryable => boolean(), details => term_to_payloads(), next_retry_delay => time()}.
-type application_failure_as_list() :: [{source, serializable()} | {message, serializable()} | {stack_trace, serializable()} | {encoded_attributes, term_to_payload()} | {type, unicode:chardata()} | {non_retryable, boolean()} | non_retryable | {details, term_to_payloads()} | {next_retry_delay, time()}].
-type call_response_error() :: temporal_sdk_client:call_result_error().
-type convertable() :: dynamic().
-type deconverted() :: dynamic().
-type failure_from_temporal() :: #{source => unicode:chardata(), message => unicode:chardata(), stack_trace => unicode:chardata(), encoded_attributes => term_from_payload(), failure_info => {atom(), map()} | {application_failure_info, #{type => unicode:chardata(), non_retryable => boolean(), details => term_from_payloads(), next_retry_delay => pos_integer()}}}.
-type response() :: {ok, FormattedResponse :: map()} | {error, FormattedGrpcResponseErrorMessage :: unicode:chardata()} | temporal_sdk_client:result().
-type response_type() :: call_formatted | call | cast | msg.
-type retry_policy() :: #{initial_interval => pos_integer(), backoff_coefficient => number(), maximum_interval => pos_integer(), maximum_attempts => pos_integer(), non_retryable_error_types => [unicode:chardata()]}.
-type serializable() :: dynamic().
-type temporal_mapstring_payload() :: #{unicode:chardata() => temporal_payload()}.
-type temporal_mapstring_payloads() :: #{unicode:chardata() => temporal_payloads()}.
-type temporal_payload() :: temporal_sdk_proto_service_workflow_binaries:'temporal.api.common.v1.Payload'().
-type temporal_payloads() :: temporal_sdk_proto_service_workflow_binaries:'temporal.api.common.v1.Payloads'().
-type term_from_mapstring_payload() :: #{unicode:chardata() => term_from_payload()}.
-type term_from_mapstring_payloads() :: #{unicode:chardata() => term_from_payloads()}.
-type term_from_payload() :: deconverted().
-type term_from_payloads() :: [deconverted()].
-type term_to_mapstring_payload() :: #{unicode:chardata() | atom() => term_to_payload()}.
-type term_to_mapstring_payloads() :: #{unicode:chardata() | atom() => term_to_payloads()}.
-type term_to_payload() :: convertable().
-type term_to_payloads() :: [convertable()].
-type time() :: {TimeLength :: number(), TimeUnit :: time_unit()} | (TimeMilliseconds :: non_neg_integer()).
-type time_unit() :: millisecond | second | minute | hour | day.
-type user_metadata() :: #{summary => term_to_payload(), details => term_to_payload()}.
-type workflow_execution() :: #{workflow_id := unicode:chardata(), run_id => unicode:chardata()}.
-type workflow_execution_or_id() :: workflow_execution() | (WorkflowId :: unicode:chardata()).
Utility functions
-spec evict_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: temporal_sdk:workflow_execution_or_id()) -> ok | temporal_sdk_client:call_result_error().
-spec evict_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: temporal_sdk:workflow_execution_or_id(), Opts :: temporal_sdk:evict_workflow_opts()) -> ok | temporal_sdk_client:call_result_error().
Evicts workflow execution.
-type evict_workflow_opts() :: [{namespace, unicode:chardata()} | {reason, atom()}].
-spec format_response(Cluster :: temporal_sdk_cluster:cluster_name(), MessageName :: temporal_sdk_client:msg_name(), Response :: {ok, temporal_sdk_client:msg()} | {error, term()}) -> response().
-spec replay_file(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowMod :: module(), Filename :: file:name_all()) -> replay_json_ret() | {error, Reason :: file:posix() | badarg | terminated | system_limit}.
-spec replay_file(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowMod :: module(), Filename :: file:name_all(), Opts :: replay_workflow_opts()) -> replay_json_ret() | {error, Reason :: file:posix() | badarg | terminated | system_limit}.
-spec replay_json(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowMod :: module(), Json :: unicode:chardata()) -> replay_json_ret().
-spec replay_json(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowMod :: module(), Json :: unicode:chardata(), Opts :: replay_workflow_opts()) -> replay_json_ret().
-type replay_json_ret() :: {ok, replay_workflow_ret()} | {error, Reason :: timeout | unclosed_history | malformed_history | invalid_cluster | {invalid_opts, map()} | {nondeterministic, map()} | term()}.
-spec replay_task(Cluster :: temporal_sdk_cluster:cluster_name(), TaskQueue :: unicode:chardata(), WorkflowType :: atom() | unicode:chardata(), WorkflowMod :: module()) -> replay_json_ret() | call_response_error().
-spec replay_task(Cluster :: temporal_sdk_cluster:cluster_name(), TaskQueue :: unicode:chardata(), WorkflowType :: atom() | unicode:chardata(), WorkflowMod :: module(), Opts :: replay_task_opts()) -> replay_json_ret() | {ok, replay_workflow_ret(), file:name_all()} | call_response_error().
-type replay_task_opts() :: [{start_workflow_opts, start_workflow_opts()} | {replay_workflow_opts, replay_workflow_opts()} | {history_file, boolean() | file:name_all()} | history_file | {history_file_write_modes, [file:mode()]}].
-type replay_workflow_opts() :: [{timeout, erlang:timeout()} | {client_opts, temporal_sdk_client:opts()} | {worker_id, term()} | {worker_opts, term()} | {task_overwrites, [{input, term_to_payloads()}]}].
-type replay_workflow_ret() :: {completed, Result :: term_to_payloads()} | {canceled, Details :: term_to_payloads()} | {failed, Failure :: application_failure() | application_failure_as_list()} | {error, Error :: temporal_sdk_telemetry:exception()}.
Workflow commands
-spec await_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id()) -> {ok, workflow_result()} | {error, Reason :: map() | invalid_cluster} | call_response_error().
-spec await_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id(), Opts :: await_workflow_opts()) -> {ok, workflow_result()} | {error, Reason :: map() | invalid_cluster} | call_response_error().
-type await_workflow_opts() :: [{namespace, unicode:chardata()} | {timeout, time() | infinity}].
-spec cancel_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id()) -> {ok, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.RequestCancelWorkflowExecutionResponse'()} | response().
-spec cancel_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id(), Opts :: cancel_workflow_opts()) -> {ok, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.RequestCancelWorkflowExecutionResponse'()} | response().
-type cancel_workflow_opts() :: [{namespace, unicode:chardata()} | {identity, unicode:chardata()} | {request_id, unicode:chardata()} | {first_execution_run_id, unicode:chardata()} | {reason, unicode:chardata()} | {grpc_opts, temporal_sdk_client:grpc_opts()} | {raw_request, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.RequestCancelWorkflowExecutionRequest'()} | {response_type, response_type()}].
-spec delete_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id()) -> {ok, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.DeleteWorkflowExecutionResponse'()} | response().
-spec delete_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id(), Opts :: delete_workflow_opts()) -> {ok, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.DeleteWorkflowExecutionResponse'()} | response().
Deletes workflow execution.
SDK Samples Workflow Delete demonstrates command use.
-type delete_workflow_opts() :: [{namespace, unicode:chardata()} | {grpc_opts, temporal_sdk_client:grpc_opts()} | {raw_request, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.DeleteWorkflowExecutionRequest'()} | {response_type, response_type()}].
-spec describe_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id()) -> {ok, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse'()} | response().
-spec describe_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id(), Opts :: describe_workflow_opts()) -> {ok, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse'()} | response().
-type describe_workflow_opts() :: [{namespace, unicode:chardata()} | {grpc_opts, temporal_sdk_client:grpc_opts()} | {raw_request, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest'()} | {response_type, response_type()}].
-spec get_workflow_history(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id()) -> {ok, [temporal_sdk_proto_service_workflow_binaries:'temporal.api.history.v1.HistoryEvent'(), ...]} | call_response_error() | {error, Reason :: term()}.
-spec get_workflow_history(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id(), Opts :: get_workflow_history_opts()) -> {ok, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse'()} | {ok, [temporal_sdk_proto_service_workflow_binaries:'temporal.api.history.v1.HistoryEvent'(), ...]} | {ok, [temporal_sdk_proto_service_workflow_binaries:'temporal.api.history.v1.HistoryEvent'(), ...], Json :: iodata(), file:name_all()} | {ok, [temporal_sdk_proto_service_workflow_binaries:'temporal.api.history.v1.HistoryEvent'(), ...], Json :: iodata()} | call_response_error() | {error, Reason :: file:posix() | badarg | terminated | system_limit | timeout | map() | term()}.
-type get_workflow_history_opts() :: [{namespace, unicode:chardata()} | {maximum_page_size, pos_integer()} | {next_page_token, binary()} | {wait_new_event, boolean()} | {history_event_filter_type, temporal_sdk_proto_service_workflow_binaries:'temporal.api.enums.v1.HistoryEventFilterType'()} | {grpc_opts, temporal_sdk_client:grpc_opts()} | {raw_request, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest'()} | {timeout, time() | infinity} | {await_all, boolean()} | await_all | {await_all_close, boolean()} | await_all_close | {await_close, boolean()} | await_close | {history_file, boolean() | file:name_all()} | history_file | {history_file_write_modes, [file:mode()]} | {json, boolean()} | json].
-spec get_workflow_state(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id()) -> get_workflow_state_ret().
-spec get_workflow_state(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id(), Opts :: describe_workflow_opts()) -> get_workflow_state_ret().
-type get_workflow_state_ret() :: {ok, completed | failed | timed_out | terminated | canceled | continued_as_new | unspecified | running} | call_response_error().
-spec query_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id(), QueryType :: unicode:chardata()) -> {ok, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.QueryWorkflowResponse'()} | response().
-spec query_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id(), QueryType :: unicode:chardata(), Opts :: query_workflow_opts()) -> {ok, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.QueryWorkflowResponse'()} | response().
-type query_workflow_opts() :: [{namespace, unicode:chardata()} | {query_args, term_to_payloads()} | {header, term_to_mapstring_payload()} | {query_reject_condition, temporal_sdk_proto_service_workflow_binaries:'temporal.api.enums.v1.QueryRejectCondition'()} | {grpc_opts, temporal_sdk_client:grpc_opts()} | {raw_request, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.QueryWorkflowRequest'()} | {response_type, response_type()}].
-spec reset_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id()) -> {ok, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.ResetWorkflowExecutionResponse'()} | response().
-spec reset_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id(), Opts :: reset_workflow_opts()) -> {ok, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.ResetWorkflowExecutionResponse'()} | response().
-type reset_workflow_opts() :: [{namespace, unicode:chardata()} | {reason, unicode:chardata()} | {workflow_task_finish_event_id, pos_integer()} | {request_id, unicode:chardata()} | {reset_reapply_exclude_types, temporal_sdk_proto_service_workflow_binaries:'temporal.api.enums.v1.ResetReapplyExcludeType'()} | {post_reset_operations, list()} | {identity, unicode:chardata()} | {grpc_opts, temporal_sdk_client:grpc_opts()} | {raw_request, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.ResetWorkflowExecutionRequest'()} | {response_type, response_type()}].
-spec signal_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id(), SignalName :: unicode:chardata()) -> {ok, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.SignalWorkflowExecutionResponse'()} | response().
-spec signal_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id(), SignalName :: unicode:chardata(), Opts :: signal_workflow_opts()) -> {ok, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.SignalWorkflowExecutionResponse'()} | response().
-type signal_workflow_opts() :: [{namespace, unicode:chardata()} | {input, term_to_payloads()} | {identity, unicode:chardata()} | {request_id, unicode:chardata()} | {header, term_to_mapstring_payload()} | {grpc_opts, temporal_sdk_client:grpc_opts()} | {raw_request, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.SignalWorkflowExecutionRequest'()} | {response_type, response_type()}].
-spec start_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), TaskQueue :: unicode:chardata(), WorkflowType :: atom() | unicode:chardata()) -> {ok, start_workflow_ret()} | {error, Reason :: map() | invalid_cluster} | call_response_error().
-spec start_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), TaskQueue :: unicode:chardata(), WorkflowType :: atom() | unicode:chardata(), Opts :: start_workflow_opts()) -> {ok, start_workflow_ret(), workflow_result()} | {start_workflow_ret(), workflow_result()} | no_return() | {ok, start_workflow_ret()} | {error, Reason :: map() | invalid_cluster} | call_response_error().
-type start_workflow_opts() :: [{namespace, unicode:chardata()} | {workflow_id, unicode:chardata()} | {input, term_to_payloads()} | {workflow_execution_timeout, time()} | {workflow_run_timeout, time()} | {workflow_task_timeout, time()} | {identity, unicode:chardata()} | {request_id, unicode:chardata()} | {workflow_id_reuse_policy, temporal_sdk_proto_service_workflow_binaries:'temporal.api.enums.v1.WorkflowIdReusePolicy'()} | {workflow_id_conflict_policy, temporal_sdk_proto_service_workflow_binaries:'temporal.api.enums.v1.WorkflowIdConflictPolicy'()} | {signal_name, unicode:chardata()} | {signal_input, term_to_payloads()} | {retry_policy, retry_policy()} | {cron_schedule, unicode:chardata()} | {memo, term_to_mapstring_payload()} | {search_attributes, term_to_mapstring_payload()} | {header, term_to_mapstring_payload()} | {request_eager_execution, boolean() | atom()} | request_eager_execution | {workflow_start_delay, time()} | {user_metadata, user_metadata()} | {versioning_override, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflow.v1.VersioningOverride'()} | {on_conflict_options, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflow.v1.OnConflictOptions'()} | {priority, temporal_sdk_proto_service_workflow_binaries:'temporal.api.common.v1.Priority'()} | {eager_worker_deployment_options, temporal_sdk_proto_service_workflow_binaries:'temporal.api.deployment.v1.WorkerDeploymentOptions'()} | {time_skipping_config, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflow.v1.TimeSkippingConfig'()} | {eager_worker_id, temporal_sdk_worker:id()} | {raw_request, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.StartWorkflowExecutionRequest'()} | {await, true | time() | infinity} | await | {wait, true | time() | infinity} | wait].
-type start_workflow_ret() :: #{request_id := unicode:chardata(), started := boolean(), workflow_execution := workflow_execution()}.
-spec terminate_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id()) -> {ok, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.TerminateWorkflowExecutionResponse'()} | response().
-spec terminate_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id(), Opts :: terminate_workflow_opts()) -> {ok, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.TerminateWorkflowExecutionResponse'()} | response().
Terminates workflow execution.
In most cases, it is recommended to use the cancel_workflow/3 command instead.
SDK Samples Workflow Terminate demonstrates command use.
-type terminate_workflow_opts() :: [{namespace, unicode:chardata()} | {reason, unicode:chardata()} | {details, term_to_payloads()} | {identity, unicode:chardata()} | {first_execution_run_id, unicode:chardata()} | {grpc_opts, temporal_sdk_client:grpc_opts()} | {raw_request, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.TerminateWorkflowExecutionRequest'()} | {response_type, response_type()}].
-type update_workflow_opts() :: [{namespace, unicode:chardata()} | {first_execution_run_id, unicode:chardata()} | {wait_for_stage, temporal_sdk_proto_service_workflow_binaries:'temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage'()} | {update_id, unicode:chardata()} | {identity, unicode:chardata()} | {header, term_to_mapstring_payload()} | {args, term_to_payloads()} | {grpc_opts, temporal_sdk_client:grpc_opts()} | {raw_request, temporal_sdk_proto_service_workflow_binaries:'temporal.api.workflowservice.v1.UpdateWorkflowExecutionRequest'()} | {response_type, response_type()}].
-spec wait_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id()) -> workflow_result() | no_return().
-spec wait_workflow(Cluster :: temporal_sdk_cluster:cluster_name(), WorkflowExecutionOrId :: workflow_execution_or_id(), Opts :: await_workflow_opts()) -> workflow_result() | no_return().
-type workflow_result() :: {completed, temporal_sdk_proto_service_workflow_binaries:'temporal.api.history.v1.WorkflowExecutionCompletedEventAttributes'()} | {canceled, temporal_sdk_proto_service_workflow_binaries:'temporal.api.history.v1.WorkflowExecutionCanceledEventAttributes'()} | {failed, temporal_sdk_proto_service_workflow_binaries:'temporal.api.history.v1.WorkflowExecutionFailedEventAttributes'()} | {continued_as_new, temporal_sdk_proto_service_workflow_binaries:'temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes'()} | {timed_out, temporal_sdk_proto_service_workflow_binaries:'temporal.api.history.v1.WorkflowExecutionTimedOutEventAttributes'()} | {terminated, temporal_sdk_proto_service_workflow_binaries:'temporal.api.history.v1.WorkflowExecutionTerminatedEventAttributes'()} | {continued_as_new, temporal_sdk_proto_service_workflow_binaries:'temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes'()}.