evoq_middleware behaviour (evoq v1.14.1)

View Source

Middleware behavior for command dispatch pipeline.

Middleware can intercept commands at three stages: - before_dispatch: Before command reaches aggregate - after_dispatch: After successful command execution - after_failure: After command execution fails

Middleware can: - Add data to pipeline assigns - Halt the pipeline (prevents further processing) - Modify the response

Summary

Functions

Assign a value to the pipeline.

Chain a pipeline through a list of middleware modules.

Get an assigned value from the pipeline.

Get an assigned value with a default.

Get the pipeline response.

Halt the pipeline.

Check if the pipeline is halted.

Set the pipeline response.

Callbacks

after_dispatch/1

(optional)
-callback after_dispatch(Pipeline ::
                             #evoq_pipeline{command ::
                                                #evoq_command{command_id :: binary() | undefined,
                                                              command_type :: atom() | undefined,
                                                              aggregate_type :: atom() | undefined,
                                                              aggregate_id :: binary() | undefined,
                                                              payload :: map(),
                                                              metadata :: map(),
                                                              causation_id :: binary() | undefined,
                                                              correlation_id :: binary() | undefined,
                                                              idempotency_key :: binary() | undefined},
                                            context ::
                                                #evoq_execution_context{command_id :: binary(),
                                                                        causation_id :: binary() | undefined,
                                                                        correlation_id ::
                                                                            binary() | undefined,
                                                                        aggregate_id :: binary(),
                                                                        aggregate_type :: atom(),
                                                                        store_id :: atom(),
                                                                        expected_version :: integer(),
                                                                        retry_attempts :: non_neg_integer(),
                                                                        consistency ::
                                                                            eventual | strong |
                                                                            {handlers, [atom()]},
                                                                        timeout :: pos_integer(),
                                                                        metadata :: map()},
                                            assigns :: map(),
                                            halted :: boolean(),
                                            response :: term()}) ->
                            #evoq_pipeline{command ::
                                               #evoq_command{command_id :: binary() | undefined,
                                                             command_type :: atom() | undefined,
                                                             aggregate_type :: atom() | undefined,
                                                             aggregate_id :: binary() | undefined,
                                                             payload :: map(),
                                                             metadata :: map(),
                                                             causation_id :: binary() | undefined,
                                                             correlation_id :: binary() | undefined,
                                                             idempotency_key :: binary() | undefined},
                                           context ::
                                               #evoq_execution_context{command_id :: binary(),
                                                                       causation_id :: binary() | undefined,
                                                                       correlation_id ::
                                                                           binary() | undefined,
                                                                       aggregate_id :: binary(),
                                                                       aggregate_type :: atom(),
                                                                       store_id :: atom(),
                                                                       expected_version :: integer(),
                                                                       retry_attempts :: non_neg_integer(),
                                                                       consistency ::
                                                                           eventual | strong |
                                                                           {handlers, [atom()]},
                                                                       timeout :: pos_integer(),
                                                                       metadata :: map()},
                                           assigns :: map(),
                                           halted :: boolean(),
                                           response :: term()}.

after_failure/1

(optional)
-callback after_failure(Pipeline ::
                            #evoq_pipeline{command ::
                                               #evoq_command{command_id :: binary() | undefined,
                                                             command_type :: atom() | undefined,
                                                             aggregate_type :: atom() | undefined,
                                                             aggregate_id :: binary() | undefined,
                                                             payload :: map(),
                                                             metadata :: map(),
                                                             causation_id :: binary() | undefined,
                                                             correlation_id :: binary() | undefined,
                                                             idempotency_key :: binary() | undefined},
                                           context ::
                                               #evoq_execution_context{command_id :: binary(),
                                                                       causation_id :: binary() | undefined,
                                                                       correlation_id ::
                                                                           binary() | undefined,
                                                                       aggregate_id :: binary(),
                                                                       aggregate_type :: atom(),
                                                                       store_id :: atom(),
                                                                       expected_version :: integer(),
                                                                       retry_attempts :: non_neg_integer(),
                                                                       consistency ::
                                                                           eventual | strong |
                                                                           {handlers, [atom()]},
                                                                       timeout :: pos_integer(),
                                                                       metadata :: map()},
                                           assigns :: map(),
                                           halted :: boolean(),
                                           response :: term()}) ->
                           #evoq_pipeline{command ::
                                              #evoq_command{command_id :: binary() | undefined,
                                                            command_type :: atom() | undefined,
                                                            aggregate_type :: atom() | undefined,
                                                            aggregate_id :: binary() | undefined,
                                                            payload :: map(),
                                                            metadata :: map(),
                                                            causation_id :: binary() | undefined,
                                                            correlation_id :: binary() | undefined,
                                                            idempotency_key :: binary() | undefined},
                                          context ::
                                              #evoq_execution_context{command_id :: binary(),
                                                                      causation_id :: binary() | undefined,
                                                                      correlation_id :: binary() | undefined,
                                                                      aggregate_id :: binary(),
                                                                      aggregate_type :: atom(),
                                                                      store_id :: atom(),
                                                                      expected_version :: integer(),
                                                                      retry_attempts :: non_neg_integer(),
                                                                      consistency ::
                                                                          eventual | strong |
                                                                          {handlers, [atom()]},
                                                                      timeout :: pos_integer(),
                                                                      metadata :: map()},
                                          assigns :: map(),
                                          halted :: boolean(),
                                          response :: term()}.

before_dispatch/1

(optional)
-callback before_dispatch(Pipeline ::
                              #evoq_pipeline{command ::
                                                 #evoq_command{command_id :: binary() | undefined,
                                                               command_type :: atom() | undefined,
                                                               aggregate_type :: atom() | undefined,
                                                               aggregate_id :: binary() | undefined,
                                                               payload :: map(),
                                                               metadata :: map(),
                                                               causation_id :: binary() | undefined,
                                                               correlation_id :: binary() | undefined,
                                                               idempotency_key :: binary() | undefined},
                                             context ::
                                                 #evoq_execution_context{command_id :: binary(),
                                                                         causation_id ::
                                                                             binary() | undefined,
                                                                         correlation_id ::
                                                                             binary() | undefined,
                                                                         aggregate_id :: binary(),
                                                                         aggregate_type :: atom(),
                                                                         store_id :: atom(),
                                                                         expected_version :: integer(),
                                                                         retry_attempts :: non_neg_integer(),
                                                                         consistency ::
                                                                             eventual | strong |
                                                                             {handlers, [atom()]},
                                                                         timeout :: pos_integer(),
                                                                         metadata :: map()},
                                             assigns :: map(),
                                             halted :: boolean(),
                                             response :: term()}) ->
                             #evoq_pipeline{command ::
                                                #evoq_command{command_id :: binary() | undefined,
                                                              command_type :: atom() | undefined,
                                                              aggregate_type :: atom() | undefined,
                                                              aggregate_id :: binary() | undefined,
                                                              payload :: map(),
                                                              metadata :: map(),
                                                              causation_id :: binary() | undefined,
                                                              correlation_id :: binary() | undefined,
                                                              idempotency_key :: binary() | undefined},
                                            context ::
                                                #evoq_execution_context{command_id :: binary(),
                                                                        causation_id :: binary() | undefined,
                                                                        correlation_id ::
                                                                            binary() | undefined,
                                                                        aggregate_id :: binary(),
                                                                        aggregate_type :: atom(),
                                                                        store_id :: atom(),
                                                                        expected_version :: integer(),
                                                                        retry_attempts :: non_neg_integer(),
                                                                        consistency ::
                                                                            eventual | strong |
                                                                            {handlers, [atom()]},
                                                                        timeout :: pos_integer(),
                                                                        metadata :: map()},
                                            assigns :: map(),
                                            halted :: boolean(),
                                            response :: term()}.

Functions

assign(Key, Value, Evoq_pipeline)

-spec assign(atom(),
             term(),
             #evoq_pipeline{command ::
                                #evoq_command{command_id :: binary() | undefined,
                                              command_type :: atom() | undefined,
                                              aggregate_type :: atom() | undefined,
                                              aggregate_id :: binary() | undefined,
                                              payload :: map(),
                                              metadata :: map(),
                                              causation_id :: binary() | undefined,
                                              correlation_id :: binary() | undefined,
                                              idempotency_key :: binary() | undefined},
                            context ::
                                #evoq_execution_context{command_id :: binary(),
                                                        causation_id :: binary() | undefined,
                                                        correlation_id :: binary() | undefined,
                                                        aggregate_id :: binary(),
                                                        aggregate_type :: atom(),
                                                        store_id :: atom(),
                                                        expected_version :: integer(),
                                                        retry_attempts :: non_neg_integer(),
                                                        consistency ::
                                                            eventual | strong | {handlers, [atom()]},
                                                        timeout :: pos_integer(),
                                                        metadata :: map()},
                            assigns :: map(),
                            halted :: boolean(),
                            response :: term()}) ->
                #evoq_pipeline{command ::
                                   #evoq_command{command_id :: binary() | undefined,
                                                 command_type :: atom() | undefined,
                                                 aggregate_type :: atom() | undefined,
                                                 aggregate_id :: binary() | undefined,
                                                 payload :: map(),
                                                 metadata :: map(),
                                                 causation_id :: binary() | undefined,
                                                 correlation_id :: binary() | undefined,
                                                 idempotency_key :: binary() | undefined},
                               context ::
                                   #evoq_execution_context{command_id :: binary(),
                                                           causation_id :: binary() | undefined,
                                                           correlation_id :: binary() | undefined,
                                                           aggregate_id :: binary(),
                                                           aggregate_type :: atom(),
                                                           store_id :: atom(),
                                                           expected_version :: integer(),
                                                           retry_attempts :: non_neg_integer(),
                                                           consistency ::
                                                               eventual | strong | {handlers, [atom()]},
                                                           timeout :: pos_integer(),
                                                           metadata :: map()},
                               assigns :: map(),
                               halted :: boolean(),
                               response :: term()}.

Assign a value to the pipeline.

chain(Evoq_pipeline, Stage, Middleware)

-spec chain(#evoq_pipeline{command ::
                               #evoq_command{command_id :: binary() | undefined,
                                             command_type :: atom() | undefined,
                                             aggregate_type :: atom() | undefined,
                                             aggregate_id :: binary() | undefined,
                                             payload :: map(),
                                             metadata :: map(),
                                             causation_id :: binary() | undefined,
                                             correlation_id :: binary() | undefined,
                                             idempotency_key :: binary() | undefined},
                           context ::
                               #evoq_execution_context{command_id :: binary(),
                                                       causation_id :: binary() | undefined,
                                                       correlation_id :: binary() | undefined,
                                                       aggregate_id :: binary(),
                                                       aggregate_type :: atom(),
                                                       store_id :: atom(),
                                                       expected_version :: integer(),
                                                       retry_attempts :: non_neg_integer(),
                                                       consistency ::
                                                           eventual | strong | {handlers, [atom()]},
                                                       timeout :: pos_integer(),
                                                       metadata :: map()},
                           assigns :: map(),
                           halted :: boolean(),
                           response :: term()},
            atom(),
            [atom()]) ->
               #evoq_pipeline{command ::
                                  #evoq_command{command_id :: binary() | undefined,
                                                command_type :: atom() | undefined,
                                                aggregate_type :: atom() | undefined,
                                                aggregate_id :: binary() | undefined,
                                                payload :: map(),
                                                metadata :: map(),
                                                causation_id :: binary() | undefined,
                                                correlation_id :: binary() | undefined,
                                                idempotency_key :: binary() | undefined},
                              context ::
                                  #evoq_execution_context{command_id :: binary(),
                                                          causation_id :: binary() | undefined,
                                                          correlation_id :: binary() | undefined,
                                                          aggregate_id :: binary(),
                                                          aggregate_type :: atom(),
                                                          store_id :: atom(),
                                                          expected_version :: integer(),
                                                          retry_attempts :: non_neg_integer(),
                                                          consistency ::
                                                              eventual | strong | {handlers, [atom()]},
                                                          timeout :: pos_integer(),
                                                          metadata :: map()},
                              assigns :: map(),
                              halted :: boolean(),
                              response :: term()}.

Chain a pipeline through a list of middleware modules.

get_assign(Key, Evoq_pipeline)

-spec get_assign(atom(),
                 #evoq_pipeline{command ::
                                    #evoq_command{command_id :: binary() | undefined,
                                                  command_type :: atom() | undefined,
                                                  aggregate_type :: atom() | undefined,
                                                  aggregate_id :: binary() | undefined,
                                                  payload :: map(),
                                                  metadata :: map(),
                                                  causation_id :: binary() | undefined,
                                                  correlation_id :: binary() | undefined,
                                                  idempotency_key :: binary() | undefined},
                                context ::
                                    #evoq_execution_context{command_id :: binary(),
                                                            causation_id :: binary() | undefined,
                                                            correlation_id :: binary() | undefined,
                                                            aggregate_id :: binary(),
                                                            aggregate_type :: atom(),
                                                            store_id :: atom(),
                                                            expected_version :: integer(),
                                                            retry_attempts :: non_neg_integer(),
                                                            consistency ::
                                                                eventual | strong | {handlers, [atom()]},
                                                            timeout :: pos_integer(),
                                                            metadata :: map()},
                                assigns :: map(),
                                halted :: boolean(),
                                response :: term()}) ->
                    term() | undefined.

Get an assigned value from the pipeline.

get_assign(Key, Evoq_pipeline, Default)

-spec get_assign(atom(),
                 #evoq_pipeline{command ::
                                    #evoq_command{command_id :: binary() | undefined,
                                                  command_type :: atom() | undefined,
                                                  aggregate_type :: atom() | undefined,
                                                  aggregate_id :: binary() | undefined,
                                                  payload :: map(),
                                                  metadata :: map(),
                                                  causation_id :: binary() | undefined,
                                                  correlation_id :: binary() | undefined,
                                                  idempotency_key :: binary() | undefined},
                                context ::
                                    #evoq_execution_context{command_id :: binary(),
                                                            causation_id :: binary() | undefined,
                                                            correlation_id :: binary() | undefined,
                                                            aggregate_id :: binary(),
                                                            aggregate_type :: atom(),
                                                            store_id :: atom(),
                                                            expected_version :: integer(),
                                                            retry_attempts :: non_neg_integer(),
                                                            consistency ::
                                                                eventual | strong | {handlers, [atom()]},
                                                            timeout :: pos_integer(),
                                                            metadata :: map()},
                                assigns :: map(),
                                halted :: boolean(),
                                response :: term()},
                 term()) ->
                    term().

Get an assigned value with a default.

get_response(Evoq_pipeline)

-spec get_response(#evoq_pipeline{command ::
                                      #evoq_command{command_id :: binary() | undefined,
                                                    command_type :: atom() | undefined,
                                                    aggregate_type :: atom() | undefined,
                                                    aggregate_id :: binary() | undefined,
                                                    payload :: map(),
                                                    metadata :: map(),
                                                    causation_id :: binary() | undefined,
                                                    correlation_id :: binary() | undefined,
                                                    idempotency_key :: binary() | undefined},
                                  context ::
                                      #evoq_execution_context{command_id :: binary(),
                                                              causation_id :: binary() | undefined,
                                                              correlation_id :: binary() | undefined,
                                                              aggregate_id :: binary(),
                                                              aggregate_type :: atom(),
                                                              store_id :: atom(),
                                                              expected_version :: integer(),
                                                              retry_attempts :: non_neg_integer(),
                                                              consistency ::
                                                                  eventual | strong |
                                                                  {handlers, [atom()]},
                                                              timeout :: pos_integer(),
                                                              metadata :: map()},
                                  assigns :: map(),
                                  halted :: boolean(),
                                  response :: term()}) ->
                      term().

Get the pipeline response.

halt(Evoq_pipeline)

-spec halt(#evoq_pipeline{command ::
                              #evoq_command{command_id :: binary() | undefined,
                                            command_type :: atom() | undefined,
                                            aggregate_type :: atom() | undefined,
                                            aggregate_id :: binary() | undefined,
                                            payload :: map(),
                                            metadata :: map(),
                                            causation_id :: binary() | undefined,
                                            correlation_id :: binary() | undefined,
                                            idempotency_key :: binary() | undefined},
                          context ::
                              #evoq_execution_context{command_id :: binary(),
                                                      causation_id :: binary() | undefined,
                                                      correlation_id :: binary() | undefined,
                                                      aggregate_id :: binary(),
                                                      aggregate_type :: atom(),
                                                      store_id :: atom(),
                                                      expected_version :: integer(),
                                                      retry_attempts :: non_neg_integer(),
                                                      consistency ::
                                                          eventual | strong | {handlers, [atom()]},
                                                      timeout :: pos_integer(),
                                                      metadata :: map()},
                          assigns :: map(),
                          halted :: boolean(),
                          response :: term()}) ->
              #evoq_pipeline{command ::
                                 #evoq_command{command_id :: binary() | undefined,
                                               command_type :: atom() | undefined,
                                               aggregate_type :: atom() | undefined,
                                               aggregate_id :: binary() | undefined,
                                               payload :: map(),
                                               metadata :: map(),
                                               causation_id :: binary() | undefined,
                                               correlation_id :: binary() | undefined,
                                               idempotency_key :: binary() | undefined},
                             context ::
                                 #evoq_execution_context{command_id :: binary(),
                                                         causation_id :: binary() | undefined,
                                                         correlation_id :: binary() | undefined,
                                                         aggregate_id :: binary(),
                                                         aggregate_type :: atom(),
                                                         store_id :: atom(),
                                                         expected_version :: integer(),
                                                         retry_attempts :: non_neg_integer(),
                                                         consistency ::
                                                             eventual | strong | {handlers, [atom()]},
                                                         timeout :: pos_integer(),
                                                         metadata :: map()},
                             assigns :: map(),
                             halted :: boolean(),
                             response :: term()}.

Halt the pipeline.

halted(Evoq_pipeline)

-spec halted(#evoq_pipeline{command ::
                                #evoq_command{command_id :: binary() | undefined,
                                              command_type :: atom() | undefined,
                                              aggregate_type :: atom() | undefined,
                                              aggregate_id :: binary() | undefined,
                                              payload :: map(),
                                              metadata :: map(),
                                              causation_id :: binary() | undefined,
                                              correlation_id :: binary() | undefined,
                                              idempotency_key :: binary() | undefined},
                            context ::
                                #evoq_execution_context{command_id :: binary(),
                                                        causation_id :: binary() | undefined,
                                                        correlation_id :: binary() | undefined,
                                                        aggregate_id :: binary(),
                                                        aggregate_type :: atom(),
                                                        store_id :: atom(),
                                                        expected_version :: integer(),
                                                        retry_attempts :: non_neg_integer(),
                                                        consistency ::
                                                            eventual | strong | {handlers, [atom()]},
                                                        timeout :: pos_integer(),
                                                        metadata :: map()},
                            assigns :: map(),
                            halted :: boolean(),
                            response :: term()}) ->
                boolean().

Check if the pipeline is halted.

respond(Response, Evoq_pipeline)

-spec respond(term(),
              #evoq_pipeline{command ::
                                 #evoq_command{command_id :: binary() | undefined,
                                               command_type :: atom() | undefined,
                                               aggregate_type :: atom() | undefined,
                                               aggregate_id :: binary() | undefined,
                                               payload :: map(),
                                               metadata :: map(),
                                               causation_id :: binary() | undefined,
                                               correlation_id :: binary() | undefined,
                                               idempotency_key :: binary() | undefined},
                             context ::
                                 #evoq_execution_context{command_id :: binary(),
                                                         causation_id :: binary() | undefined,
                                                         correlation_id :: binary() | undefined,
                                                         aggregate_id :: binary(),
                                                         aggregate_type :: atom(),
                                                         store_id :: atom(),
                                                         expected_version :: integer(),
                                                         retry_attempts :: non_neg_integer(),
                                                         consistency ::
                                                             eventual | strong | {handlers, [atom()]},
                                                         timeout :: pos_integer(),
                                                         metadata :: map()},
                             assigns :: map(),
                             halted :: boolean(),
                             response :: term()}) ->
                 #evoq_pipeline{command ::
                                    #evoq_command{command_id :: binary() | undefined,
                                                  command_type :: atom() | undefined,
                                                  aggregate_type :: atom() | undefined,
                                                  aggregate_id :: binary() | undefined,
                                                  payload :: map(),
                                                  metadata :: map(),
                                                  causation_id :: binary() | undefined,
                                                  correlation_id :: binary() | undefined,
                                                  idempotency_key :: binary() | undefined},
                                context ::
                                    #evoq_execution_context{command_id :: binary(),
                                                            causation_id :: binary() | undefined,
                                                            correlation_id :: binary() | undefined,
                                                            aggregate_id :: binary(),
                                                            aggregate_type :: atom(),
                                                            store_id :: atom(),
                                                            expected_version :: integer(),
                                                            retry_attempts :: non_neg_integer(),
                                                            consistency ::
                                                                eventual | strong | {handlers, [atom()]},
                                                            timeout :: pos_integer(),
                                                            metadata :: map()},
                                assigns :: map(),
                                halted :: boolean(),
                                response :: term()}.

Set the pipeline response.