# OpenAI Realtime Integration with Membrane WebRTC

```elixir
File.cd(__DIR__)
Logger.configure(level: :info)

Mix.install([
  {:membrane_core, "~> 1.1"},
  {:membrane_webrtc_plugin, "~> 0.22.0"},
  {:membrane_opus_plugin, "~> 0.20.4"},
  {:membrane_raw_audio_parser_plugin, "~> 0.4.0"},
  {:membrane_realtimer_plugin, "~> 0.10.0"},
  {:kino_membrane, "~> 0.3.0"},
  {:websockex, "~> 0.4.3"},
  {:jason, "~> 1.4"}
])
```

## Introduction

This demo shows how to use Membrane Framework to create a simple WebRTC based app that allows you to have a conversation with ChatGPT using the newest [OpenAI Realtime API](https://openai.com/index/introducing-the-realtime-api/).

## WebSocket handler

OpenAI Realtime API requires sending and receiving audio via the WebSocket. Let's create a module responsible for handling it with `WebSockex` library.

```elixir
defmodule OpenAIWebSocket do
  use WebSockex
  require Logger

  def start_link(opts) do
    WebSockex.start_link(
      "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01",
      __MODULE__,
      %{parent: self()},
      opts
    )
  end

  @impl true
  def handle_frame(frame, state) do
    send(state.parent, {:websocket_frame, frame})
    {:ok, state}
  end

  def send_frame(ws, frame), do: WebSockex.send_frame(ws, {:text, frame})
end
```

## Membrane Components

Then, we will create a Membrane Element that will receive and send raw audio frames via the WebSocket.

```elixir
defmodule OpenAIEndpoint do
  use Membrane.Endpoint
  require Membrane.Logger

  def_input_pad(:input, accepted_format: _any)
  def_output_pad(:output, accepted_format: _any, flow_control: :push)

  def_options(websocket_opts: [])

  @impl true
  def handle_init(_ctx, opts) do
    {:ok, ws} = OpenAIWebSocket.start_link(opts.websocket_opts)
    {[], %{ws: ws}}
  end

  @impl true
  def handle_playing(_ctx, state) do
    # format of the buffers sent in the line 36
    format = %Membrane.RawAudio{channels: 1, sample_rate: 24_000, sample_format: :s16le}
    {[stream_format: {:output, format}], state}
  end

  @impl true
  def handle_buffer(:input, buffer, _ctx, state) do
    audio = Base.encode64(buffer.payload)
    frame = %{type: "input_audio_buffer.append", audio: audio} |> Jason.encode!()
    :ok = OpenAIWebSocket.send_frame(state.ws, frame)
    {[], state}
  end

  @impl true
  def handle_info({:websocket_frame, {:text, frame}}, _ctx, state) do
    case Jason.decode!(frame) do
      %{"type" => "response.audio.delta", "delta" => delta} ->
        audio_payload = Base.decode64!(delta)
        {[buffer: {:output, %Membrane.Buffer{payload: audio_payload}}], state}

      %{"type" => "response.audio.done"} ->
        {[event: {:output, %Membrane.Realtimer.Events.Reset{}}], state}

      %{"type" => "response.audio_transcript.done", "transcript" => transcript} ->
        Membrane.Logger.info("AI transcription: #{transcript}")
        {[], state}

      %{} ->
        {[], state}
    end
  end
end
```

Now, let's write a Pipeline module that exchanges the media with the browser using `Membrane.WebRTC.Source` and `Sink` and with OpenAI server using `OpenAIEndpoint`.

Because WebRTC requires and provides audio in OPUS format and OpenAI Realtime API uses raw audio, we have to spawn the proper encoder and decoder between WebRTC and OpenAI elements.

```elixir
defmodule OpenAIPipeline do
  use Membrane.Pipeline

  @impl true
  def handle_init(_ctx, opts) do
    spec =
      child(:webrtc_source, %Membrane.WebRTC.Source{
        signaling: {:websocket, port: opts[:webrtc_source_ws_port]}
      })
      |> via_out(:output, options: [kind: :audio])
      |> child(:input_opus_parser, Membrane.Opus.Parser)
      |> child(:opus_decoder, %Membrane.Opus.Decoder{sample_rate: 24_000})
      |> child(:open_ai, %OpenAIEndpoint{websocket_opts: opts[:openai_ws_opts]})
      |> child(:raw_audio_parser, %Membrane.RawAudioParser{overwrite_pts?: true})
      |> via_in(:input, target_queue_size: 1_000_000_000, toilet_capacity: 1_000_000_000)
      |> child(:realtimer, Membrane.Realtimer)
      |> child(:opus_encoder, Membrane.Opus.Encoder)
      |> via_in(:input, options: [kind: :audio])
      |> child(:webrtc_sink, %Membrane.WebRTC.Sink{
        tracks: [:audio],
        signaling: {:websocket, port: opts[:webrtc_sink_ws_port]}
      })

    {[spec: spec], %{}}
  end
end
```

## Getting OpenAI API key from the env

Let's set the WebSocket options (remember to set `OPENAI_API KEY` env).

```elixir
openai_api_key = System.get_env("OPENAI_API_KEY")

if openai_api_key == nil do
  raise "You have to set OPENAI_API_KEY env"
end

openai_ws_opts = [
  extra_headers: [
    {"Authorization", "Bearer " <> openai_api_key},
    {"OpenAI-Beta", "realtime=v1"}
  ]
]

:ok
```

## Running the server

Now, let's start the pipeline.

```elixir
{:ok, _supervisor, pipeline} =
  Membrane.Pipeline.start_link(OpenAIPipeline,
    openai_ws_opts: openai_ws_opts,
    webrtc_source_ws_port: 8829,
    webrtc_sink_ws_port: 8831
  )

:inets.start()

:inets.start(:httpd,
  bind_address: ~c"localhost",
  port: 8000,
  document_root: ~c"#{__DIR__}/assets",
  server_name: ~c"webrtc",
  server_root: "/tmp"
)

Process.monitor(pipeline)

receive do
  {:DOWN, _ref, :process, ^pipeline, _reason} -> :ok
end
```

Enter <http://localhost:8000/index.html> from the new tab of Google Chrome and start your conversation with the AI!

Transcription of AI answers will be available in the logs of the cell below.
