OpenaiExPipeline.Pipeline (openai_ex_pipeline v0.0.1)

View Source

Pipeline functions for API operations.

Summary

Functions

Cleans up resources by deleting vector stores, files, and responses.

Confirms that all files have been processed by the vector store.

Creates a response using the OpenAI API.

Creates a vector store with the given name.

Retrieves the output messages from the pipeline token.

Initializes the resources map with default values.

Merges two resource maps together, combining lists and preserving other values.

Joins a list of names using Oxford comma style.

Removes conversation entries at the specified index or range.

Removes an output message at the specified index.

Updates the data field in the pipeline resources.

Uploads multiple files to the vector store in parallel.

Functions

cleanup_resources(token)

Cleans up resources by deleting vector stores, files, and responses.

Parameters

  • token: The pipeline token containing resources to clean up The token should contain:
    • openai_client: The OpenAI client instance
    • responses: List of responses to delete
    • files: Map of files to delete
    • vector_store: Vector store to delete

Returns

  • The original token after cleanup operations
  • Note: This function performs cleanup operations but does not modify the token's structure

Examples

iex> token = {:ok, %{openai_client: client, responses: [response], files: %{file1: file}, vector_store: store}}
iex> Pipeline.cleanup_resources(token)
{:ok, %{...}}

confirm_vector_store_processing(error)

Confirms that all files have been processed by the vector store.

Parameters

  • token: The pipeline token containing resources

Returns

  • {:error, error} if the token is an error
  • {:ok, resources} if there are no files or no vector store
  • {:ok, updated_resources} with processed files

create_response(error, input, api_options, options \\ %{})

Creates a response using the OpenAI API.

Parameters

  • token: The pipeline token containing resources
  • input: The input for the API call (function or list)
  • api_options: Options for the API call
  • options: Additional options (defaults to %{})

Returns

  • {:error, error} if the token is an error or input is invalid
  • {:ok, updated_resources} with the API response

create_vector_store(error, vector_store_name)

@spec create_vector_store(
  {:error, any()}
  | {:ok,
     %{
       :openai_client => any(),
       :vector_store => any(),
       optional(any()) => any()
     }},
  any()
) ::
  {:error, any()}
  | {:ok,
     %{
       :openai_client => any(),
       :vector_store => any(),
       optional(any()) => any()
     }}

Creates a vector store with the given name.

Parameters

  • token: The pipeline token containing resources
  • vector_store_name: The name for the new vector store

Returns

  • {:error, error} if the token is an error or vector store already exists
  • {:ok, updated_resources} with the new vector store

get_output(arg)

Retrieves the output messages from the pipeline token.

Parameters

  • token: The pipeline token containing resources

Returns

  • {:error, error} if the token contains an error
  • {:ok, output_messages} with the list of output messages on success

Examples

iex> token = {:ok, %{output_messages: ["message1", "message2"]}}
iex> Pipeline.get_output(token)
{:ok, ["message1", "message2"]}

iex> token = {:error, %{error: "Something went wrong"}}
iex> Pipeline.get_output(token)
{:error, "Something went wrong"}

init_resources(openai_client)

Initializes the resources map with default values.

Parameters

  • openai_client: The OpenAI client instance

Returns

A map containing initialized resources

merge_resources(resources, new_resources)

Merges two resource maps together, combining lists and preserving other values.

Parameters

  • resources: The base resources map
  • new_resources: The new resources map to merge with

Returns

A merged resources map where:

  • :conversation lists are concatenated
  • :output_messages lists are concatenated
  • :responses lists are concatenated
  • All other values are taken from the base resources map

Examples

iex> base = %{conversation: [1, 2], output_messages: ["a"], responses: [%{}], files: %{}}
iex> new = %{conversation: [3, 4], output_messages: ["b"], responses: [%{}], files: %{}}
iex> Pipeline.merge_resources(base, new)
%{conversation: [1, 2, 3, 4], output_messages: ["a", "b"], responses: [%{}, %{}], files: %{}}

oxford_join(names)

Joins a list of names using Oxford comma style.

Parameters

  • names: List of names to join

Returns

A string with the names joined using Oxford comma style

remove_conversation(token, range_val)

Removes conversation entries at the specified index or range.

Parameters

  • token: The pipeline token containing resources
  • range_val: Either an index or a Range of indices to remove

Returns

  • {:error, token} if the token is an error
  • {:ok, updated_resources} with the conversation entries removed

remove_output_messages(token, index)

Removes an output message at the specified index.

Parameters

  • token: The pipeline token containing resources
  • index: The index of the output message to remove

Returns

  • {:error, token} if the token is an error
  • {:ok, updated_resources} with the output message removed

update_data(error, data_to_update)

Updates the data field in the pipeline resources.

Parameters

  • token: The pipeline token containing resources
  • data_to_update: Map of data to merge into the existing data field

Returns

  • {:error, error} if the token contains an error
  • {:ok, updated_resources} with the merged data on success

Examples

iex> token = {:ok, %{data: %{key1: "value1"}}}
iex> Pipeline.update_data(token, %{key2: "value2"})
{:ok, %{data: %{key1: "value1", key2: "value2"}}}

iex> token = {:error, %{error: "Something went wrong"}}
iex> Pipeline.update_data(token, %{key: "value"})
{:error, %{error: "Something went wrong"}}

upload_file(token, file_label, file_name, options \\ %{async_connection: false})

Uploads a file to the vector store.

Parameters

  • token: The pipeline token containing resources
  • file_label: The label for the file
  • file_name: The path to the file
  • options: Map of options (defaults to %{async_connection: false})

Returns

  • {:error, error} if the token is an error or file doesn't exist
  • {:ok, updated_resources} with the uploaded file

upload_files(token, file_list, options \\ %{async_connection: false})

Uploads multiple files to the vector store in parallel.

Parameters

  • token: The pipeline token containing resources
  • file_list: List of maps where each map has :label and :file keys Example: [%{label: :my_label, file: "path/to/file.md", optional: false}]
  • options: Map of options (defaults to %{async_connection: false})

Returns

  • {:ok, updated_resources} on success
  • {:error, error_resources} on failure

upload_optional_file(token, file_label, file_name, options \\ %{async_connection: false})

Uploads an optional file to the vector store.

Parameters

  • token: The pipeline token containing resources
  • file_label: The label for the file
  • file_name: The path to the file
  • options: Map of options (defaults to %{async_connection: false})

Returns

  • {:error, error} if the token is an error
  • {:ok, resources} if the file is blank
  • {:ok, updated_resources} with the uploaded file

upload_optional_files(token, incoming_files, options \\ %{async_connection: false})

Uploads multiple optional files to the vector store.

Parameters

  • token: The pipeline token containing resources
  • incoming_files: List of file paths to upload
  • options: Map of options (defaults to %{async_connection: false})

Returns

  • {:error, error} if the token is an error
  • {:ok, updated_resources} with the uploaded files

upload_output_message_as_file(token, file_key, output_message_index, options \\ %{async_connection: false, remove_from_conversation: nil, remove_from_output_messages?: false})

Uploads an output message as a file to OpenAI's file storage.

Takes an output message from the pipeline's output_messages list and creates a temporary file with that content, then uploads it to OpenAI.

WARNING: The file_key parameter will be used as both the temporary filename and the filename for the OpenAI upload. It must have a file extension that OpenAI accepts (e.g. .txt, .md, .json).

Parameters

  • token: The pipeline token containing resources
  • file_key: The filename to use for the temporary and uploaded file (e.g. "epics.md")
  • output_message_index: Index of the output message to upload
  • options: Optional map of parameters
    • :async_connection - If true, will not wait for vector store processing (default: false)
    • :remove_from_conversation - expects an index or a range of indexes to remove from the conversation (default: nil)
    • :remove_from_output_messages? - If true, removes message from output_messages after upload (default: false)

Returns

  • {:ok, updated_resources} on successful upload
  • {:error, reason} if upload fails or output_message_index is invalid