A streaming audio input that can be appended to.
Uses an Agent-backed queue for audio chunks. This allows you to push audio data to the input while the pipeline consumes it.
Example
input = StreamedAudioInput.new()
# Producer task
Task.async(fn ->
for chunk <- audio_source do
StreamedAudioInput.add(input, chunk)
end
StreamedAudioInput.close(input)
end)
# Consumer
for chunk <- StreamedAudioInput.stream(input) do
process(chunk)
end
Summary
Functions
Add an audio chunk to the stream.
Close the stream, signaling no more data will be added.
Get the next chunk from the stream.
Create a new streamed audio input.
Stream audio chunks until the stream is closed.
Types
@type t() :: %Codex.Voice.Input.StreamedAudioInput{queue: pid()}
Functions
Add an audio chunk to the stream.
Examples
iex> input = Codex.Voice.Input.StreamedAudioInput.new()
iex> Codex.Voice.Input.StreamedAudioInput.add(input, <<0, 0>>)
:ok
@spec close(t()) :: :ok
Close the stream, signaling no more data will be added.
After calling close, consumers will receive :eof after consuming
all remaining chunks.
Get the next chunk from the stream.
Returns:
{:ok, binary}- The next audio chunk:eof- The stream has been closed:empty- No data currently available (try again later)
@spec new() :: t()
Create a new streamed audio input.
Starts a stream queue process to manage the audio chunk queue.
@spec stream(t()) :: Enumerable.t()
Stream audio chunks until the stream is closed.
This returns a Stream that yields audio chunks as they become
available, completing when the stream is closed.
Note: If the stream is empty and not closed, this will poll with
a 10ms delay. For high-performance use cases, consider using
get/1 directly with your own polling strategy.