When we are running a flow, we are running it in the context of a contact and/or a conversation (or other Glific data types). Let encapsulate this in a module and isolate the flow from the other aspects of Glific

Check if there is an active context (i.e. with a non null, node_uuid for this contact)

Standard changeset pattern we use for all data types

Create a FlowContext

Delete all the contexts which are completed before two days

Delete all the contexts which are older than 7 days

Execute one (or more) steps in a flow based on the message stream

A single place to parse the variable in a string related to flows.

Start a new context, if there is an existing context, blow it away

Load the context object, given a flow object and a contact. At some point, we'll get the genserver to cache this

Set all the flows for a specific context to be completed

Count the number of times we have sent the same message in the recent past

Generate a notification having all the flow context data.

A single place to parse the variable in a string related to flows.

Resets all the context for the user when we hit an error. This can potentially prevent an infinite loop from happening if flows are connected in a cycle

Resets the context and sends control back to the parent context if one exists

Reset this context, but don't follow parent context tail. This is used for tail call optimization

Resume the flow for a given contact and a given flow id if still active

Seed the context and set the wake up time as needed

Set the new node for the context

Given an input string, consume the input and advance the state of the context

Update the recent_* state as we consume or send a message

Update the contact results with each element of the json map

Find all the contexts which need to be woken up and processed

Process one context at a time that is ready to be woken

@type t() :: %Glific.Flows.FlowContext{
  __meta__: Ecto.Schema.Metadata.t(),
  completed_at: :utc_datetime | nil,
  contact: Glific.Contacts.Contact.t() | Ecto.Association.NotLoaded.t() | nil,
  contact_id: non_neg_integer() | nil,
  delay: integer(),
  flow: Glific.Flows.Flow.t() | Ecto.Association.NotLoaded.t() | nil,
  flow_id: non_neg_integer() | nil,
  flow_uuid: Ecto.UUID.t() | nil,
  id: term(),
  inserted_at: :utc_datetime | nil,
  is_await_result: boolean(),
  is_background_flow: boolean(),
  is_killed: boolean(),
  last_message: Glific.Messages.Message.t() | nil,
    Glific.Messages.Message.t() | Ecto.Association.NotLoaded.t() | nil,
  message_broadcast_id: non_neg_integer() | nil,
  node: Glific.Flows.Node.t() | nil,
  node_uuid: Ecto.UUID.t() | nil,
    Glific.Partners.Organization.t() | Ecto.Association.NotLoaded.t() | nil,
  organization_id: non_neg_integer() | nil,
  parent: t() | Ecto.Association.NotLoaded.t() | nil,
  parent_id: non_neg_integer() | nil,
  profile: Glific.Profiles.Profile.t() | Ecto.Association.NotLoaded.t() | nil,
  profile_id: non_neg_integer() | nil,
  recent_inbound: [map()] | [],
  recent_outbound: [map()] | [],
  results: map() | nil,
  status: String.t() | nil,
  updated_at: :utc_datetime | nil,
  uuid_map: map() | nil,
  uuids_seen: map(),
  wakeup_at: :utc_datetime | nil

active_context(contact_id, parent_id \\ nil)

@spec active_context(non_neg_integer(), non_neg_integer() | nil) :: t() | nil

Check if there is an active context (i.e. with a non null, node_uuid for this contact)

changeset(context, attrs)

@spec changeset(t(), map()) :: Ecto.Changeset.t()

Standard changeset pattern we use for all data types

create_flow_context(attrs \\ %{})

@spec create_flow_context(map()) :: {:ok, t()} | {:error, Ecto.Changeset.t()}

Create a FlowContext

delete_completed_flow_contexts(back \\ 2)

@spec delete_completed_flow_contexts(non_neg_integer()) :: :ok

Delete all the contexts which are completed before two days

delete_old_flow_contexts(back \\ 7)

@spec delete_old_flow_contexts(non_neg_integer()) :: :ok

Delete all the contexts which are older than 7 days

execute(context, messages)

@spec execute(t(), [Glific.Messages.Message.t()]) ::
  {:ok | :wait, t(), [Glific.Messages.Message.t()]} | {:error, String.t()}

Execute one (or more) steps in a flow based on the message stream

@spec get_vars_to_parse(t()) :: map()

A single place to parse the variable in a string related to flows.

init_context(flow, contact, status, opts \\ [])

@spec init_context(
  Keyword.t() | []
) ::
  {:ok | :wait, t(), [String.t()]} | {:error, String.t()}

Start a new context, if there is an existing context, blow it away

load_context(context, flow)

@spec load_context(t(), Glific.Flows.Flow.t()) :: t()

Load the context object, given a flow object and a contact. At some point, we'll get the genserver to cache this

mark_flows_complete(contact_id, arg2, opts \\ [])

@spec mark_flows_complete(non_neg_integer(), boolean(), Keyword.t()) :: nil

Set all the flows for a specific context to be completed

match_outbound(context, body, go_back \\ 6)

@spec match_outbound(t(), String.t(), integer()) :: integer()

Count the number of times we have sent the same message in the recent past

notification(context, message)

@spec notification(t(), String.t()) :: nil

Generate a notification having all the flow context data.

parse_context_string(context, str)

@spec parse_context_string(t(), String.t()) :: String.t()

A single place to parse the variable in a string related to flows.

reset_all_contexts(context, message)

@spec reset_all_contexts(t(), String.t()) :: t() | nil

Resets all the context for the user when we hit an error. This can potentially prevent an infinite loop from happening if flows are connected in a cycle

@spec reset_context(t()) :: t() | nil

Resets the context and sends control back to the parent context if one exists

reset_one_context(context, opts \\ [])

@spec reset_one_context(t(), Keyword.t()) :: t()

Reset this context, but don't follow parent context tail. This is used for tail call optimization

resume_contact_flow(contact, flow_id, result, message \\ nil)

@spec resume_contact_flow(
  non_neg_integer() | t() | nil,
  Glific.Messages.Message.t() | nil
) :: {:ok, t() | nil, [String.t()]} | {:error, String.t()} | nil

Resume the flow for a given contact and a given flow id if still active

seed_context(flow, contact, status, opts \\ [])

@spec seed_context(
) ::
  {:ok, t()} | {:error, Ecto.Changeset.t()}

Seed the context and set the wake up time as needed

@spec set_node(t(), Glific.Flows.Node.t()) :: t()

Set the new node for the context

step_forward(context, message)

@spec step_forward(t(), Glific.Messages.Message.t()) ::
  {:ok, map()} | {:error, String.t()}

Given an input string, consume the input and advance the state of the context

update_recent(context, msg, type)

@spec update_recent(t(), map(), atom()) :: t()

Update the recent_* state as we consume or send a message

update_results(context, result)

@spec update_results(t(), map() | nil) :: t()

Update the contact results with each element of the json map

@spec wakeup_flows(non_neg_integer()) :: any()

Find all the contexts which need to be woken up and processed

wakeup_one(context, message \\ nil)

@spec wakeup_one(t(), Glific.Messages.Message.t() | nil) ::
  {:ok, t() | nil, [String.t()]} | {:error, String.t()} | nil

Process one context at a time that is ready to be woken