HLS.Packager (HTTP Live Streaming (HLS) library v3.0.8)

View Source

A fully functional HLS packager that performs pure state transformations.

Instead of performing I/O operations, this module returns Actions that the caller must execute. This design provides:

  • Pure functions (no side effects)
  • Testability without mocking
  • Explicit control flow
  • Caller-controlled concurrency
  • Ability to batch operations

Usage Flow

# 1. Initialize
{:ok, state} = Packager.new(
  manifest_uri: URI.new!("stream.m3u8"),
  max_segments: 10
)

# 2. Add tracks
{state, []} = Packager.add_track(state, "video",
  stream: %VariantStream{...},
  segment_extension: ".m4s",
  target_segment_duration: 6
)

# 3. Add segment (returns upload action)
{state, [action]} = Packager.put_segment(state, "video", duration: 5.2, pts: 0)
# action = %Action.UploadSegment{
#   id: "video_seg_1",
#   uri: URI.parse("stream_video/00000/stream_video_00001.m4s"),
#   track_id: "video"
# }

# 4. Caller uploads the segment with their payload
:ok = Storage.put(storage, action.uri, video_payload)

# 5. Confirm upload (may trigger playlist writes)
{state, actions} = Packager.confirm_upload(state, action.id)
# actions = [%Action.WritePlaylist{uri: ..., content: ...}]

# 6. Execute write actions
Enum.each(actions, fn
  %Action.WritePlaylist{uri: uri, content: content} ->
    Storage.put(storage, uri, content)
end)

# 7. Sync (triggers media playlist updates)
{state, actions} = Packager.sync(state, 3)
# actions = [
#   %Action.WritePlaylist{type: :media, ...},
#   %Action.WritePlaylist{type: :master, ...},
#   %Action.DeleteSegment{uri: ...}  # if sliding window
# ]

Actions

All operations return {new_state, [Action.t()]} where actions must be executed by the caller in order.

Summary

Functions

Adds a track to the packager. Returns updated state with no actions.

Confirms that an init section upload completed.

Confirms that a segment upload completed.

Marks all tracks to add discontinuity to next segment.

Flushes the packager, finalizing all playlists.

Creates a new packager state.

Returns the next synchronization point.

Prepares a new init section upload.

Adds a segment to a track and returns an upload action.

Resumes from existing playlists loaded by the caller.

Skips a sync point across all tracks and schedules a discontinuity.

Synchronizes all tracks to the specified segment index.

Checks whether selected tracks have enough available segments to sync.

Types

t()

@type t() :: %HLS.Packager{
  manifest_uri: URI.t(),
  master_written?: boolean(),
  max_segments: pos_integer() | nil,
  pending_discontinuities: list(),
  skipped_sync_points: %{required(pos_integer()) => MapSet.t(track_id())},
  timeline_reference: DateTime.t(),
  timing_tolerance_ns: non_neg_integer(),
  tracks: %{required(track_id()) => HLS.Packager.Track.t()}
}

track_id()

@type track_id() :: String.t()

Functions

add_track(state, track_id, opts)

@spec add_track(t(), track_id(), keyword()) :: {t(), []}

Adds a track to the packager. Returns updated state with no actions.

Tracks can only be added before the master playlist is written.

Examples

{state, []} = Packager.add_track(state, "video_720p",
  stream: %VariantStream{
    bandwidth: 2_500_000,
    resolution: {1280, 720},
    codecs: ["avc1.64001f", "mp4a.40.2"]
  },
  segment_extension: ".m4s",
  target_segment_duration: 6.0,
  codecs: ["avc1.64001f"],
  mandatory?: true
)

confirm_init_upload(state, upload_id)

@spec confirm_init_upload(t(), String.t()) :: {t(), []}

Confirms that an init section upload completed.

confirm_upload(state, upload_id)

@spec confirm_upload(t(), String.t()) ::
  {t(), [HLS.Packager.Action.WritePlaylist.t()]}
  | {:warning, HLS.Packager.Error.t(), t()}

Confirms that a segment upload completed.

May return playlist write actions if enough segments are buffered.

Examples

{state, actions} = Packager.confirm_upload(state, "video_seg_1")
# actions = [
#   %Action.WritePlaylist{type: :pending, uri: ..., content: ...}
# ]

discontinue(state)

@spec discontinue(t()) :: {t(), []} | {:error, HLS.Packager.Error.t(), t()}

Marks all tracks to add discontinuity to next segment.

flush(state)

@spec flush(t()) :: {t(), [HLS.Packager.Action.t()]}

Flushes the packager, finalizing all playlists.

When max_segments is nil (unlimited):

  • Creates VOD playlists with EXT-X-ENDLIST
  • Returns write actions for final playlists

When max_segments is set (sliding window):

  • Returns delete actions for all segments and playlists
  • Resets state to initial condition

Examples

# VOD mode
{state, actions} = Packager.flush(state)
# actions = [
#   %Action.WritePlaylist{type: :media, ...},  # VOD playlist
#   %Action.WritePlaylist{type: :media, ...},
#   %Action.DeletePlaylist{type: :pending, ...},
# ]

# Sliding window mode
{state, actions} = Packager.flush(sliding_state)
# actions = [
#   %Action.DeleteSegment{...},
#   %Action.DeleteSegment{...},
#   %Action.DeletePlaylist{type: :media, ...},
#   %Action.DeletePlaylist{type: :master, ...},
# ]

new(opts)

@spec new(keyword()) :: {:ok, t()} | {:error, term()}

Creates a new packager state.

Options

  • :manifest_uri (required) - URI of the master playlist
  • :max_segments - Maximum segments per media playlist (nil = unlimited)
  • :timing_tolerance_ms - Allowed timing drift in milliseconds (default: 200)

Examples

{:ok, state} = Packager.new(
  manifest_uri: URI.new!("stream.m3u8"),
  max_segments: 10
)

next_sync_point(state)

@spec next_sync_point(t()) :: pos_integer()

Returns the next synchronization point.

This is the maximum segment count across all tracks plus one.

put_init_section(state, track_id)

@spec put_init_section(t(), track_id()) ::
  {t(), [HLS.Packager.Action.UploadInitSection.t()]}

Prepares a new init section upload.

Returns an action to upload the init section. The caller must upload the payload and then call confirm_init_upload/2.

Examples

{state, [action]} = Packager.put_init_section(state, "video")
# action = %Action.UploadInitSection{
#   id: "video_init_1",
#   uri: URI.parse("stream_video/00000/stream_video_00001_init.mp4"),
#   track_id: "video"
# }

# Caller uploads
:ok = Storage.put(storage, action.uri, init_payload)

# Confirm
{state, []} = Packager.confirm_init_upload(state, action.id)

put_segment(state, track_id, opts)

@spec put_segment(t(), track_id(), keyword()) ::
  {t(), [HLS.Packager.Action.UploadSegment.t()]}
  | {:warning, HLS.Packager.Error.t(), t()}
  | {:error, HLS.Packager.Error.t(), t()}

Adds a segment to a track and returns an upload action.

The caller must upload the segment payload and then call confirm_upload/2. The caller must provide PTS in nanoseconds; DTS may be provided for video and is used instead of PTS when present.

Returns an error when RFC 8216 compliance issues are detected.

Examples

{state, [action]} = Packager.put_segment(state, "video", duration: 5.2, pts: 0)

# Caller uploads
:ok = Storage.put(storage, action.uri, segment_payload)

# Confirm (may trigger playlist writes)
{state, actions} = Packager.confirm_upload(state, action.id)

resume(opts)

@spec resume(keyword()) :: {:ok, t()} | {:error, term()}

Resumes from existing playlists loaded by the caller.

The caller is responsible for loading the master playlist and all media playlists. This function reconstructs the state from the loaded data.

Examples

# Caller loads playlists
master = load_master_playlist(master_uri)

media_playlists =
  Enum.map(master.streams, fn stream ->
    load_media_playlist(stream.uri)
  end)

# Resume
{:ok, state} = Packager.resume(
  master_playlist: master,
  media_playlists: media_playlists,
  max_segments: 10
)

skip_sync_point(state, sync_point)

@spec skip_sync_point(t(), pos_integer()) ::
  {t(), []} | {:error, HLS.Packager.Error.t(), t()}

Skips a sync point across all tracks and schedules a discontinuity.

Callers should invoke this when a segment is rejected for RFC compliance, so all tracks drop the same segment index before continuing.

sync(state, sync_point)

@spec sync(t(), pos_integer()) ::
  {t(), [HLS.Packager.Action.t()]}
  | {:warning, [HLS.Packager.Error.t()], t()}
  | {:error, HLS.Packager.Error.t(), t()}

Synchronizes all tracks to the specified segment index.

Moves segments from pending playlists to media playlists and returns actions to write updated playlists. May also return delete actions if sliding window is enabled.

Examples

{state, actions} = Packager.sync(state, 5)
# actions = [
#   %Action.WritePlaylist{type: :media, ...},
#   %Action.WritePlaylist{type: :media, ...},
#   %Action.WritePlaylist{type: :master, ...},
#   %Action.DeleteSegment{...},  # if sliding window
# ]

sync_ready?(state, sync_point, track_filter \\ &variant_track?/2)

@spec sync_ready?(t(), pos_integer(), (track_id(), HLS.Packager.Track.t() ->
                                   boolean())) ::
  {boolean(), [track_id()]}

Checks whether selected tracks have enough available segments to sync.

By default, only variant (video) tracks are checked. Returns {ready?, lagging_track_ids}.