Atex.Repo (atex v0.9.1)

View Source

AT Protocol repository - a signed, content-addressed store of records.

A repository is a key/value mapping of repo paths (collection/rkey) to records (CBOR objects), backed by a Merkle Search Tree (MST). Each published version of the tree is captured in a signed Atex.Repo.Commit.

Quick start

# Create a new empty repository
repo = Atex.Repo.new()

# Insert records (string path or Atex.Repo.Path struct)
{:ok, repo} = Atex.Repo.put_record(repo, "app.bsky.feed.post/3jzfcijpj2z2a", %{"text" => "hello"})

# Commit (sign) the current tree state
jwk = JOSE.JWK.generate_key({:ec, "P-256"})
{:ok, repo} = Atex.Repo.commit(repo, "did:plc:example", jwk)

# Export to a CAR file
{:ok, car_binary} = Atex.Repo.to_car(repo)

# Round-trip import
{:ok, repo2} = Atex.Repo.from_car(car_binary)

# Verify the commit signature
:ok = Atex.Repo.verify_commit(repo2, JOSE.JWK.to_public(jwk))

Paths

Record paths can be passed as plain strings ("collection/rkey") or as Atex.Repo.Path structs. Both are accepted by all path-taking functions. See Atex.Repo.Path for validation rules and struct API.

Record storage

Records are DRISL CBOR-encoded. Their CIDs (:drisl codec) are stored as leaf values in the MST. The raw record bytes are tracked in a separate blocks map inside the struct so they are available for CAR export without re-encoding.

CAR serialization

to_car/1 produces a CARv1 file in the streamable block order described in the spec: commit first, then MST nodes in depth-first pre-order, interleaved with their record blocks.

from_car/1 decodes a CAR file, extracts the signed commit from the first root CID, loads the MST, and collects all record blocks. It does not verify the commit signature - call verify_commit/2 explicitly.

stream_car/1 provides a lazy stream over a CAR binary, emitting {:commit, commit} then {:record, path, record} tuples without loading the full repository into memory. Requires a streamable-order CAR (commit first, MST nodes in pre-order before their records).

ATProto spec: https://atproto.com/specs/repository

Summary

Types

t()

An AT Protocol repository.

Functions

Signs the current tree state and stores the result as the repository commit.

Removes the record at path.

Decodes a CARv1 binary into a repository struct.

Retrieves the record at path, returning the decoded map.

Returns a deduplicated list of all collection names in the repository.

Returns a sorted list of all record keys within collection.

Returns a sorted list of {rkey, record_map} pairs for all records in collection.

Returns a new empty repository with no records and no commit.

Inserts or replaces the record at path.

Returns a lazy stream over a CARv1 chunk stream, emitting decoded items without loading the full repository into memory.

Exports the repository as a CARv1 binary.

Verifies the commit signature against the given public key.

Types

ordered_acc()

@type ordered_acc() :: {%{required(DASL.CID.t()) => binary()}, [DASL.CID.t()]}

t()

@type t() :: %Atex.Repo{
  blocks: %{required(DASL.CID.t()) => binary()},
  commit: Atex.Repo.Commit.t() | nil,
  tree: MST.Tree.t()
}

An AT Protocol repository.

Functions

commit(repo, did, signing_key)

@spec commit(t(), String.t(), JOSE.JWK.t()) :: {:ok, t()} | {:error, atom()}

Signs the current tree state and stores the result as the repository commit.

Builds an Atex.Repo.Commit for did referencing the current MST root, signs it with signing_key, and updates repo.commit. The rev is set to the current timestamp as a TID string, guaranteed to be monotonically increasing relative to any previous commit in this process.

Examples

iex> repo = Atex.Repo.new()
iex> jwk = JOSE.JWK.generate_key({:ec, "P-256"})
iex> {:ok, repo} = Atex.Repo.commit(repo, "did:plc:example", jwk)
iex> repo.commit.did
"did:plc:example"
iex> repo.commit.version
3

delete_record(repo, path)

@spec delete_record(t(), String.t() | Atex.Repo.Path.t()) ::
  {:ok, t()}
  | {:error,
     :not_found | :invalid_path | :invalid_collection | :invalid_rkey | atom()}

Removes the record at path.

path may be a "collection/rkey" string or an Atex.Repo.Path struct.

Returns {:error, :not_found} if the path does not exist.

Examples

iex> repo = Atex.Repo.new()
iex> {:ok, repo} = Atex.Repo.put_record(repo, "app.bsky.feed.post/3jzfcijpj2z2a", %{"text" => "hi"})
iex> {:ok, repo} = Atex.Repo.delete_record(repo, "app.bsky.feed.post/3jzfcijpj2z2a")
iex> Atex.Repo.get_record(repo, "app.bsky.feed.post/3jzfcijpj2z2a")
{:error, :not_found}

from_car(binary)

@spec from_car(binary()) :: {:ok, t()} | {:error, atom()}

Decodes a CARv1 binary into a repository struct.

The first root CID in the CAR header must point to a valid signed commit block. The MST is reconstructed from the remaining :drisl codec blocks. Record blocks are collected into repo.blocks.

The commit signature is not verified. Call verify_commit/2 explicitly if you need to authenticate the repository.

Examples

iex> repo = Atex.Repo.new()
iex> {:ok, repo} = Atex.Repo.put_record(repo, "app.bsky.feed.post/3jzfcijpj2z2a", %{"text" => "hello"})
iex> jwk = JOSE.JWK.generate_key({:ec, "P-256"})
iex> {:ok, repo} = Atex.Repo.commit(repo, "did:plc:example", jwk)
iex> {:ok, bin} = Atex.Repo.to_car(repo)
iex> {:ok, repo2} = Atex.Repo.from_car(bin)
iex> repo2.commit.did
"did:plc:example"

get_record(repo, path)

@spec get_record(t(), String.t() | Atex.Repo.Path.t()) ::
  {:ok, map()}
  | {:error,
     :not_found | :invalid_path | :invalid_collection | :invalid_rkey | atom()}

Retrieves the record at path, returning the decoded map.

path may be a "collection/rkey" string or an Atex.Repo.Path struct.

Returns {:error, :not_found} if the path does not exist.

Examples

iex> repo = Atex.Repo.new()
iex> {:ok, repo} = Atex.Repo.put_record(repo, "app.bsky.feed.post/3jzfcijpj2z2a", %{"text" => "hi"})
iex> {:ok, record} = Atex.Repo.get_record(repo, "app.bsky.feed.post/3jzfcijpj2z2a")
iex> record["text"]
"hi"

list_collections(repo)

@spec list_collections(t()) :: {:ok, [String.t()]} | {:error, atom()}

Returns a deduplicated list of all collection names in the repository.

Collections are returned in MST key order (bytewise-lexicographic on the full collection/rkey path string). This is generally close to but not identical to alphabetical order - for example, "foo.bar" sorts after "foo.bar.baz" because / (0x2F) > . (0x2E).

Examples

iex> repo = Atex.Repo.new()
iex> {:ok, repo} = Atex.Repo.put_record(repo, "app.bsky.feed.post/aaaa", %{})
iex> {:ok, repo} = Atex.Repo.put_record(repo, "app.bsky.feed.like/bbbb", %{})
iex> {:ok, repo} = Atex.Repo.put_record(repo, "app.bsky.feed.post/bbbb", %{})
iex> {:ok, cols} = Atex.Repo.list_collections(repo)
iex> cols
["app.bsky.feed.like", "app.bsky.feed.post"]

list_record_keys(repo, collection)

@spec list_record_keys(t(), String.t()) :: {:ok, [String.t()]} | {:error, atom()}

Returns a sorted list of all record keys within collection.

The list is in MST key order, which for TID-keyed records is chronological. Returns an empty list (not an error) when the collection exists in the repo but has no records, or does not exist at all.

Examples

iex> repo = Atex.Repo.new()
iex> {:ok, repo} = Atex.Repo.put_record(repo, "app.bsky.feed.post/aaaa", %{})
iex> {:ok, repo} = Atex.Repo.put_record(repo, "app.bsky.feed.post/bbbb", %{})
iex> {:ok, keys} = Atex.Repo.list_record_keys(repo, "app.bsky.feed.post")
iex> keys
["aaaa", "bbbb"]

list_records(repo, collection)

@spec list_records(t(), String.t()) :: {:ok, [{String.t(), map()}]} | {:error, atom()}

Returns a sorted list of {rkey, record_map} pairs for all records in collection.

The list is in MST key order. Returns an empty list when the collection does not exist or has no records.

Examples

iex> repo = Atex.Repo.new()
iex> {:ok, repo} = Atex.Repo.put_record(repo, "app.bsky.feed.post/aaaa", %{"n" => 1})
iex> {:ok, repo} = Atex.Repo.put_record(repo, "app.bsky.feed.post/bbbb", %{"n" => 2})
iex> {:ok, records} = Atex.Repo.list_records(repo, "app.bsky.feed.post")
iex> Enum.map(records, fn {rkey, _} -> rkey end)
["aaaa", "bbbb"]

new()

@spec new() :: t()

Returns a new empty repository with no records and no commit.

Examples

iex> repo = Atex.Repo.new()
iex> repo.commit
nil

put_record(repo, path, record)

@spec put_record(t(), String.t() | Atex.Repo.Path.t(), map()) ::
  {:ok, t()}
  | {:error, :invalid_path | :invalid_collection | :invalid_rkey | atom()}

Inserts or replaces the record at path.

path may be a "collection/rkey" string or an Atex.Repo.Path struct.

The record is DRISL CBOR-encoded and its CID computed. The CID is inserted into the MST as a leaf value. The commit is not updated - call commit/3 to sign the new tree state.

Examples

iex> repo = Atex.Repo.new()
iex> {:ok, repo} = Atex.Repo.put_record(repo, "app.bsky.feed.post/3jzfcijpj2z2a", %{"text" => "hi"})
iex> {:ok, record} = Atex.Repo.get_record(repo, "app.bsky.feed.post/3jzfcijpj2z2a")
iex> record["text"]
"hi"

stream_car(chunk_stream)

@spec stream_car(Enumerable.t()) :: Enumerable.t()

Returns a lazy stream over a CARv1 chunk stream, emitting decoded items without loading the full repository into memory.

chunk_stream must be an Enumerable that yields binary chunks of any size - for example File.stream!("repo.car", [], 65_536) or a chunked HTTP response body. Passing a plain binary also works but is equivalent to loading it into memory first; prefer from_car/1 in that case.

The stream emits:

  • {:commit, Atex.Repo.Commit.t()} - the first item, decoded from the CAR root block
  • {:record, Atex.Repo.Path.t(), map()} - one per record, decoded in the order they appear in the CAR

The CAR must be in streamable pre-order: commit block first, then MST nodes before their child nodes and records. This is the format produced by to_car/1 and by spec-compliant PDS exports. For CARs with arbitrary block ordering use from_car/1 instead.

If a record block is encountered before its parent MST node has been seen (i.e. the path cannot be resolved from already-decoded nodes), the stream emits {:error, :unresolvable_record, cid} and halts. Parse errors raise a RuntimeError (consistent with DASL.CAR.stream_decode/2 semantics).

Examples

From a file without loading it fully into memory:

File.stream!("repo.car", 65_536, [:raw, :binary])
|> Atex.Repo.stream_car()
|> Enum.each(fn
  {:commit, commit} -> IO.puts(commit.did)
  {:record, path, record} -> IO.inspect({to_string(path), record})
end)

From a binary (e.g. in tests):

iex> repo = Atex.Repo.new()
iex> {:ok, repo} = Atex.Repo.put_record(repo, "app.bsky.feed.post/aaaa", %{"n" => 1})
iex> jwk = JOSE.JWK.generate_key({:ec, "P-256"})
iex> {:ok, repo} = Atex.Repo.commit(repo, "did:plc:example", jwk)
iex> {:ok, bin} = Atex.Repo.to_car(repo)
iex> items = Atex.Repo.stream_car([bin]) |> Enum.to_list()
iex> match?([{:commit, _} | _], items)
true
iex> Enum.any?(items, &match?({:record, _, _}, &1))
true

Partial consumption with Stream.take/2 works without raising:

File.stream!("repo.car", 65_536, [:raw, :binary])
|> Atex.Repo.stream_car()
|> Stream.filter(&match?({:record, _, _}, &1))
|> Stream.take(10)
|> Enum.to_list()

to_car(repo)

@spec to_car(t()) :: {:ok, binary()} | {:error, :no_commit | atom()}

Exports the repository as a CARv1 binary.

Block ordering follows the streamable convention from the spec:

  1. The signed commit block.
  2. The MST root node, then MST nodes in depth-first pre-order, with each record block immediately following the MST entry that references it.

Returns {:error, :no_commit} if commit/3 has not been called.

Examples

iex> repo = Atex.Repo.new()
iex> {:ok, repo} = Atex.Repo.put_record(repo, "app.bsky.feed.post/3jzfcijpj2z2a", %{"text" => "hello"})
iex> jwk = JOSE.JWK.generate_key({:ec, "P-256"})
iex> {:ok, repo} = Atex.Repo.commit(repo, "did:plc:example", jwk)
iex> {:ok, bin} = Atex.Repo.to_car(repo)
iex> is_binary(bin)
true

verify_commit(repo, jwk)

@spec verify_commit(t(), JOSE.JWK.t()) :: :ok | {:error, :no_commit | atom()}

Verifies the commit signature against the given public key.

Delegates to Atex.Repo.Commit.verify/2.

Examples

iex> repo = Atex.Repo.new()
iex> jwk = JOSE.JWK.generate_key({:ec, "P-256"})
iex> {:ok, repo} = Atex.Repo.commit(repo, "did:plc:example", jwk)
iex> Atex.Repo.verify_commit(repo, JOSE.JWK.to_public(jwk))
:ok