Jido.Messaging.Ingest (Jido Messaging v1.0.0)

Copy Markdown View Source

Inbound message processing pipeline.

Handles incoming messages from channels:

  1. Resolves/creates room by external binding
  2. Resolves/creates participant by external ID
  3. Builds normalized Message struct
  4. Persists message via adapter
  5. Returns message with context for handler processing

Usage

case Ingest.ingest_incoming(MyApp.Messaging, TelegramChannel, "bot_123", incoming_data) do
  {:ok, message, context} ->
    # message is persisted, context contains room/participant info
  {:error, reason} ->
    # handle error
end

Summary

Functions

Process an incoming message from a channel with ingest policy options.

Process an incoming message without deduplication check.

Process an incoming message without deduplication check and with ingest policy options.

Types

context()

@type context() :: Jido.Messaging.Context.t()

incoming()

@type incoming() :: Jido.Chat.Incoming.t() | map()

ingest_error()

@type ingest_error() :: policy_denial() | security_denial() | term()

ingest_opts()

@type ingest_opts() :: keyword()

policy_denial()

@type policy_denial() :: {:policy_denied, policy_stage(), atom(), String.t()}

policy_stage()

@type policy_stage() :: :gating | :moderation

security_denial()

@type security_denial() :: Jido.Messaging.Security.security_denial()

Functions

ingest_incoming(messaging_module, channel_module, bridge_id, incoming)

@spec ingest_incoming(module(), module(), String.t(), incoming()) ::
  {:ok, Jido.Messaging.Message.t(), context()}
  | {:ok, :duplicate}
  | {:error, ingest_error()}

Process an incoming message from a channel.

Returns {:ok, message, context} on success where:

  • message is the persisted Message struct
  • context contains room, participant, and channel info for reply handling

Returns {:ok, :duplicate} if the message has already been processed.

ingest_incoming(messaging_module, channel_module, bridge_id, incoming, opts)

@spec ingest_incoming(module(), module(), String.t(), incoming(), ingest_opts()) ::
  {:ok, Jido.Messaging.Message.t(), context()}
  | {:ok, :duplicate}
  | {:error, ingest_error()}

Process an incoming message from a channel with ingest policy options.

Options

  • :gaters - List of modules implementing Jido.Messaging.Gating behaviour
  • :gating_opts - Keyword options passed to each gater
  • :gating_timeout_ms - Timeout per gater check (default: 50)
  • :moderators - List of modules implementing Jido.Messaging.Moderation behaviour
  • :moderation_opts - Keyword options passed to each moderator
  • :moderation_timeout_ms - Timeout per moderator check (default: 50)
  • :policy_timeout_fallback - Timeout fallback policy (:deny or :allow_with_flag)
  • :policy_error_fallback - Crash/error fallback policy (:deny or :allow_with_flag)
  • :security - Runtime overrides for Jido.Messaging.Security config
  • :require_mention - Require MsgContext.was_mentioned to be true
  • :allowed_prefixes - Allowed command prefixes for parsed commands
  • :mention_targets - Mention targets used to normalize was_mentioned
  • :command_prefixes - Command parser prefix candidates
  • :command_max_text_bytes - Max message size for command parsing
  • :mentions_max_text_bytes - Max message size for mention adapter parsing

ingest_incoming!(messaging_module, channel_module, bridge_id, incoming)

@spec ingest_incoming!(module(), module(), String.t(), incoming()) ::
  {:ok, Jido.Messaging.Message.t(), context()} | {:error, ingest_error()}

Process an incoming message without deduplication check.

Use this when you've already verified the message is not a duplicate, or when deduplication is handled externally.

ingest_incoming!(messaging_module, channel_module, bridge_id, incoming, opts)

@spec ingest_incoming!(module(), module(), String.t(), incoming(), ingest_opts()) ::
  {:ok, Jido.Messaging.Message.t(), context()} | {:error, ingest_error()}

Process an incoming message without deduplication check and with ingest policy options.