PgLargeObjects.LargeObject (PgLargeObjects v0.2.0)

View Source

Low-level API for managing large objects.

This module defines a structure LargeObject which represents a large object in a PostgreSQL database which was opened for reading or writing.

The functions create/2 and open/3 create a new resp. open an existing large object given some object ID. These functions return a new LargeObject structure to which other functions such as size/1 or write/2 can be applied.

Transactions Required

All operations on LargeObject values must take place within a database transactions since the internal handle managed by the structure is only valid for the duration of a transaction.

Any large object value will be closed automatically at the end of the transaction.

Streaming

Since there is both an Enumerable as well as a Collectable implementation for this structure, Enum and Stream APIs can be used to interact with the object, e.g.

# Get 189th byte of object:
Repo.transaction(fn ->
  {:ok, lob} = LargeObject.open(Repo, object_id)
  Enum.at(lob, 188)
end)

# Stream object into a list of chunks:
Repo.transaction(fn ->
  {:ok, lob} = LargeObject.open(Repo, object_id)
  Enum.to_list(lob)
end)

Summary

Functions

Close a large object.

Create (and open) a large object.

Open a large object for reading or writing.

Read data from large object.

Remove a large object.

Resize large object.

Set read/write position in large object.

Get the size of a large object.

Get read/write position in large object.

Write data to a large object.

Types

t()

@type t() :: %PgLargeObjects.LargeObject{
  bufsize: non_neg_integer(),
  fd: non_neg_integer(),
  oid: pos_integer(),
  repo: Ecto.Repo.t()
}

Functions

close(lob)

@spec close(t()) :: :ok | {:error, :not_found}

Close a large object.

Frees any database resources associated with the given object lob.

Any large object descriptors that remain open at the end of a transaction will be closed automatically.

Return value

  • :ok on success.
  • {:error, :not_found} if the given large object is not open (e.g. because it was already closed, or deleted).

create(repo, opts \\ [])

@spec create(
  Ecto.Repo.t(),
  keyword()
) :: {:ok, t()}

Create (and open) a large object.

Creates a new large object in the database repo with a random object ID, and opens it for reading or writing.

The object will be closed automatically at the end of the transaction.

Options

See open/3 for a list of supported options.

Return value

  • {:ok, lob} where lob is LargeObject structure.

open(repo, oid, opts \\ [])

@spec open(Ecto.Repo.t(), pos_integer(), keyword()) ::
  {:ok, t()} | {:error, :not_found}

Open a large object for reading or writing.

Opens an existing large object identified by the object identifier oid in the database repo.

The object will be closed automatically at the end of the transaction.

Options

  • :bufsize - number of bytes to transfer at a time when streaming into/out of the object. Defaults to 1MB.
  • :mode - can be one of :read, :write, :append or :read_write indicating whether to open the object for reading, writing, appending or reading and writing.

Return value

  • {:ok, lob} on success, where lob is LargeObject structure.
  • {:error, :not_found} if the given oid does not reference a large object.

read(lob, length)

@spec read(t(), non_neg_integer()) :: {:ok, binary()} | {:error, :not_found}

Read data from large object.

Reads a length bytes of data from the given large object lob, starting at the current iosition in the object. Advanced the position by the number of bytes read, or until the end of file. The read position will not be advanced when the current position is beyond the end of the file.

The data is not chunked but transferred in one go. For large amounts of data, do not pass a large length but instead consider streaming data by leveraging the Enumerable implementation, e.g.

Repo.transaction(fn ->
    {:ok, lob} = LargeObject.open(Repo, object_id, [mode: :read])

    # Stream large object to local file.
    lob
    |> Stream.into(File.stream!("/tmp/recording.ogg"))
    |> Stream.run()
end)

Return value

  • {:ok, data} on success
  • {:error, :not_found} if the given large object is not open (e.g. because it was already closed, or deleted).

remove(repo, oid)

@spec remove(Ecto.Repo.t(), pos_integer()) :: :ok | {:error, :not_found}

Remove a large object.

Deletes a large object identified by oid from the database referenced by repo.

Return value

  • :ok on success.
  • {:error, :not_found} if the given oid does not reference a large object.

resize(lob, size)

@spec resize(t(), non_neg_integer()) :: :ok | {:error, :not_found}

Resize large object.

Truncates (or extends) the given large object lob such that it is size bytes in size.

If size is larger than the current size of the object, the object will be extended with null bytes (<<0>>).

Return value

  • :ok on success
  • {:error, :not_found} if the given large object is not open (e.g. because it was already closed, or deleted).
  • {:error, :read_only} if the given large object was not opened for writing.

seek(lob, offset, start \\ :start)

@spec seek(t(), integer(), :start | :current | :end) ::
  {:ok, non_neg_integer()} | {:error, :not_found}

Set read/write position in large object.

Modifies the current position within the large object to which read/2 and write/2 operations apply to offset.

The offset value is interpreted depending on the start value, which can be one of three atoms:

  • :start - interpret offset as the number of bytes from the start of the object. The offset should be a non-negative value. Using the offset 0 moves the position to the first byte in the object.
  • :current - interpret offset as a value relative to the current position. The offset can be any integer. Using the offset 0 leaves the position unchanged.
  • :end - interpret offset as the number of bytes from the end of the object. The offset should be a non-positive value. Using the offset 0 moves the position to one byte after the object.

The default start value is :start.

It is possible to seek past the end of the object, but it is not permitted to seek before the beginning of the object.

Return value

  • {:ok, new_position} on success
  • {:error, :not_found} if the given large object is not open (e.g. because it was already closed, or deleted).

size(lob)

@spec size(t()) :: {:ok, non_neg_integer()} | {:error, :not_found}

Get the size of a large object.

Calculates the size (in bytes) of the given large object lob.

Enum.count/1 vs. Enum.size/1

Note that this is not the same as using Enum.count/1; Enum.count/1, by virtue of the Enumerable implementation, will return the number of chunks in the given object, i.e. the number of times any streaming access would need to hit the database. The number of chunks is determined by the :bufsize option given to create/2 or open/3.

Return value

  • {:ok, size} on success, with size being the size of the object in bytes.
  • {:error, :not_found} if the given large object is not open (e.g. because it was already closed, or deleted).

tell(lob)

@spec tell(t()) :: {:ok, non_neg_integer()} | {:error, :not_found}

Get read/write position in large object.

Returns the current position within the large object to which read/2 and write/2 operations apply.

Return value

  • {:ok, position} on success
  • {:error, :not_found} if the given large object is not open (e.g. because it was already closed, or deleted).

write(lob, data)

@spec write(t(), binary()) :: :ok | {:error, :not_found} | {:error, :read_only}

Write data to a large object.

Writes the given binary data to the large object lob, starting at the current position in the object. May overwrite existing data, or extend the size of the object as needed. Advances the position in the large object by the number of bytes in data.

The data is not chunked but transferred in one go. For large amounts of data, consider streaming data by leveraging the Collectable implementation, e.g.

Repo.transaction(fn ->
    {:ok, lob} = LargeObject.open(Repo, object_id, [mode: :write])

    # Stream large file into the large object.
    File.stream!("/tmp/recording.ogg")
    |> Stream.into(lob)
    |> Stream.run()
end)

Return value

  • :ok on success
  • {:error, :not_found} if the given large object is not open (e.g. because it was already closed, or deleted).
  • {:error, :read_only} if the given large object was not opened for writing.