HLS.Packager (HTTP Live Streaming (HLS) library v3.0.8)
View SourceA fully functional HLS packager that performs pure state transformations.
Instead of performing I/O operations, this module returns Actions that the caller
must execute. This design provides:
- Pure functions (no side effects)
- Testability without mocking
- Explicit control flow
- Caller-controlled concurrency
- Ability to batch operations
Usage Flow
# 1. Initialize
{:ok, state} = Packager.new(
manifest_uri: URI.new!("stream.m3u8"),
max_segments: 10
)
# 2. Add tracks
{state, []} = Packager.add_track(state, "video",
stream: %VariantStream{...},
segment_extension: ".m4s",
target_segment_duration: 6
)
# 3. Add segment (returns upload action)
{state, [action]} = Packager.put_segment(state, "video", duration: 5.2, pts: 0)
# action = %Action.UploadSegment{
# id: "video_seg_1",
# uri: URI.parse("stream_video/00000/stream_video_00001.m4s"),
# track_id: "video"
# }
# 4. Caller uploads the segment with their payload
:ok = Storage.put(storage, action.uri, video_payload)
# 5. Confirm upload (may trigger playlist writes)
{state, actions} = Packager.confirm_upload(state, action.id)
# actions = [%Action.WritePlaylist{uri: ..., content: ...}]
# 6. Execute write actions
Enum.each(actions, fn
%Action.WritePlaylist{uri: uri, content: content} ->
Storage.put(storage, uri, content)
end)
# 7. Sync (triggers media playlist updates)
{state, actions} = Packager.sync(state, 3)
# actions = [
# %Action.WritePlaylist{type: :media, ...},
# %Action.WritePlaylist{type: :master, ...},
# %Action.DeleteSegment{uri: ...} # if sliding window
# ]Actions
All operations return {new_state, [Action.t()]} where actions must be executed
by the caller in order.
Summary
Functions
Adds a track to the packager. Returns updated state with no actions.
Confirms that an init section upload completed.
Confirms that a segment upload completed.
Marks all tracks to add discontinuity to next segment.
Flushes the packager, finalizing all playlists.
Creates a new packager state.
Returns the next synchronization point.
Prepares a new init section upload.
Adds a segment to a track and returns an upload action.
Resumes from existing playlists loaded by the caller.
Skips a sync point across all tracks and schedules a discontinuity.
Synchronizes all tracks to the specified segment index.
Checks whether selected tracks have enough available segments to sync.
Types
@type t() :: %HLS.Packager{ manifest_uri: URI.t(), master_written?: boolean(), max_segments: pos_integer() | nil, pending_discontinuities: list(), skipped_sync_points: %{required(pos_integer()) => MapSet.t(track_id())}, timeline_reference: DateTime.t(), timing_tolerance_ns: non_neg_integer(), tracks: %{required(track_id()) => HLS.Packager.Track.t()} }
@type track_id() :: String.t()
Functions
Adds a track to the packager. Returns updated state with no actions.
Tracks can only be added before the master playlist is written.
Examples
{state, []} = Packager.add_track(state, "video_720p",
stream: %VariantStream{
bandwidth: 2_500_000,
resolution: {1280, 720},
codecs: ["avc1.64001f", "mp4a.40.2"]
},
segment_extension: ".m4s",
target_segment_duration: 6.0,
codecs: ["avc1.64001f"],
mandatory?: true
)
Confirms that an init section upload completed.
@spec confirm_upload(t(), String.t()) :: {t(), [HLS.Packager.Action.WritePlaylist.t()]} | {:warning, HLS.Packager.Error.t(), t()}
Confirms that a segment upload completed.
May return playlist write actions if enough segments are buffered.
Examples
{state, actions} = Packager.confirm_upload(state, "video_seg_1")
# actions = [
# %Action.WritePlaylist{type: :pending, uri: ..., content: ...}
# ]
@spec discontinue(t()) :: {t(), []} | {:error, HLS.Packager.Error.t(), t()}
Marks all tracks to add discontinuity to next segment.
@spec flush(t()) :: {t(), [HLS.Packager.Action.t()]}
Flushes the packager, finalizing all playlists.
When max_segments is nil (unlimited):
- Creates VOD playlists with EXT-X-ENDLIST
- Returns write actions for final playlists
When max_segments is set (sliding window):
- Returns delete actions for all segments and playlists
- Resets state to initial condition
Examples
# VOD mode
{state, actions} = Packager.flush(state)
# actions = [
# %Action.WritePlaylist{type: :media, ...}, # VOD playlist
# %Action.WritePlaylist{type: :media, ...},
# %Action.DeletePlaylist{type: :pending, ...},
# ]
# Sliding window mode
{state, actions} = Packager.flush(sliding_state)
# actions = [
# %Action.DeleteSegment{...},
# %Action.DeleteSegment{...},
# %Action.DeletePlaylist{type: :media, ...},
# %Action.DeletePlaylist{type: :master, ...},
# ]
Creates a new packager state.
Options
:manifest_uri(required) - URI of the master playlist:max_segments- Maximum segments per media playlist (nil = unlimited):timing_tolerance_ms- Allowed timing drift in milliseconds (default: 200)
Examples
{:ok, state} = Packager.new(
manifest_uri: URI.new!("stream.m3u8"),
max_segments: 10
)
@spec next_sync_point(t()) :: pos_integer()
Returns the next synchronization point.
This is the maximum segment count across all tracks plus one.
@spec put_init_section(t(), track_id()) :: {t(), [HLS.Packager.Action.UploadInitSection.t()]}
Prepares a new init section upload.
Returns an action to upload the init section. The caller must upload the payload
and then call confirm_init_upload/2.
Examples
{state, [action]} = Packager.put_init_section(state, "video")
# action = %Action.UploadInitSection{
# id: "video_init_1",
# uri: URI.parse("stream_video/00000/stream_video_00001_init.mp4"),
# track_id: "video"
# }
# Caller uploads
:ok = Storage.put(storage, action.uri, init_payload)
# Confirm
{state, []} = Packager.confirm_init_upload(state, action.id)
@spec put_segment(t(), track_id(), keyword()) :: {t(), [HLS.Packager.Action.UploadSegment.t()]} | {:warning, HLS.Packager.Error.t(), t()} | {:error, HLS.Packager.Error.t(), t()}
Adds a segment to a track and returns an upload action.
The caller must upload the segment payload and then call confirm_upload/2.
The caller must provide PTS in nanoseconds; DTS may be provided for video and
is used instead of PTS when present.
Returns an error when RFC 8216 compliance issues are detected.
Examples
{state, [action]} = Packager.put_segment(state, "video", duration: 5.2, pts: 0)
# Caller uploads
:ok = Storage.put(storage, action.uri, segment_payload)
# Confirm (may trigger playlist writes)
{state, actions} = Packager.confirm_upload(state, action.id)
Resumes from existing playlists loaded by the caller.
The caller is responsible for loading the master playlist and all media playlists. This function reconstructs the state from the loaded data.
Examples
# Caller loads playlists
master = load_master_playlist(master_uri)
media_playlists =
Enum.map(master.streams, fn stream ->
load_media_playlist(stream.uri)
end)
# Resume
{:ok, state} = Packager.resume(
master_playlist: master,
media_playlists: media_playlists,
max_segments: 10
)
@spec skip_sync_point(t(), pos_integer()) :: {t(), []} | {:error, HLS.Packager.Error.t(), t()}
Skips a sync point across all tracks and schedules a discontinuity.
Callers should invoke this when a segment is rejected for RFC compliance, so all tracks drop the same segment index before continuing.
@spec sync(t(), pos_integer()) :: {t(), [HLS.Packager.Action.t()]} | {:warning, [HLS.Packager.Error.t()], t()} | {:error, HLS.Packager.Error.t(), t()}
Synchronizes all tracks to the specified segment index.
Moves segments from pending playlists to media playlists and returns actions to write updated playlists. May also return delete actions if sliding window is enabled.
Examples
{state, actions} = Packager.sync(state, 5)
# actions = [
# %Action.WritePlaylist{type: :media, ...},
# %Action.WritePlaylist{type: :media, ...},
# %Action.WritePlaylist{type: :master, ...},
# %Action.DeleteSegment{...}, # if sliding window
# ]
@spec sync_ready?(t(), pos_integer(), (track_id(), HLS.Packager.Track.t() -> boolean())) :: {boolean(), [track_id()]}
Checks whether selected tracks have enough available segments to sync.
By default, only variant (video) tracks are checked.
Returns {ready?, lagging_track_ids}.