Forge.AnvilBridge behaviour (Forge v0.1.1)

View Source

Behaviour for Anvil integration - publishing samples to Anvil for labeling.

Anvil is North-Shore-AI's human-in-the-loop labeling platform. This bridge enables Forge to export samples to Anvil queues and sync labels back.

Callbacks

Adapters

  • Mock - Test adapter with in-memory state
  • Direct - Direct Elixir calls (same BEAM cluster)
  • HTTP - REST API calls (separate deployment)

Configuration

# config/config.exs
config :forge, :anvil_bridge_adapter, Forge.AnvilBridge.Direct

Examples

# Publish a sample
adapter = Application.get_env(:forge, :anvil_bridge_adapter)
{:ok, job_id} = adapter.publish_sample(sample, queue: "cns_narratives")

# Batch publish
{:ok, count} = adapter.publish_batch(samples, queue: "cns_narratives")

# Get labels
{:ok, labels} = adapter.get_labels(sample_id)

Summary

Callbacks

Create an Anvil queue for pipeline samples.

Fetch labels for a sample from Anvil.

Fetch queue statistics from Anvil.

Batch publish samples to Anvil labeling queue.

Publish a single sample to Anvil labeling queue.

Sync labels from Anvil back to Forge sample metadata.

Functions

Create an Anvil queue for pipeline samples.

Fetch labels for a sample from Anvil.

Fetch queue statistics from Anvil.

Batch publish samples to Anvil labeling queue.

Publish a sample to Anvil labeling queue.

Convert a Forge sample to Anvil-ready DTO format.

Sync labels from Anvil back to Forge sample metadata.

Types

error()

@type error() ::
  :not_found
  | :timeout
  | :network
  | :unauthorized
  | :not_available
  | {:unexpected, term()}

job_id()

@type job_id() :: String.t()

queue_name()

@type queue_name() :: String.t()

sample_id()

@type sample_id() :: String.t()

Callbacks

create_queue_for_pipeline(pipeline_id, opts)

@callback create_queue_for_pipeline(pipeline_id :: String.t(), opts :: keyword()) ::
  {:ok, queue_name()} | {:error, error()}

Create an Anvil queue for pipeline samples.

Parameters

  • pipeline_id - Forge pipeline UUID
  • opts - Options including:
    • :queue_name - Custom queue name (default: pipeline_id)
    • :description - Queue description
    • :priority - Queue priority (default: "normal")

Returns

  • {:ok, queue_name} - Created queue name
  • {:error, reason} - Failure

Examples

{:ok, "pipeline-abc-123"} = adapter.create_queue_for_pipeline(
  "pipeline-abc-123",
  description: "CNS narrative samples"
)

get_labels(sample_id)

@callback get_labels(sample_id()) :: {:ok, [map()]} | {:error, error()}

Fetch labels for a sample from Anvil.

Parameters

  • sample_id - Forge sample UUID

Returns

  • {:ok, labels} - List of label maps with keys:
    • :label - The label value (e.g., "entailment", "neutral")
    • :annotator_id - UUID of the annotator
    • :confidence - Confidence score (0.0-1.0)
    • :created_at - Timestamp of annotation
  • {:error, :not_found} - Sample has no labels
  • {:error, reason} - Other failures

Examples

{:ok, labels} = adapter.get_labels("sample-123")
# [%{label: "entailment", annotator_id: "user-1", confidence: 1.0, ...}]

get_queue_stats(queue_name)

@callback get_queue_stats(queue_name()) :: {:ok, map()} | {:error, error()}

Fetch queue statistics from Anvil.

Parameters

  • queue_name - Anvil queue name

Returns

  • {:ok, stats} - Map with keys:
    • :total - Total samples in queue
    • :completed - Completed samples
    • :pending - Pending samples
    • :in_progress - Samples being labeled
  • {:error, reason} - Failure

Examples

{:ok, stats} = adapter.get_queue_stats("cns_narratives")
# %{total: 500, completed: 47, pending: 450, in_progress: 3}

publish_batch(samples, opts)

@callback publish_batch(samples :: [map()], opts :: keyword()) ::
  {:ok, non_neg_integer()} | {:error, error()}

Batch publish samples to Anvil labeling queue.

More efficient than multiple publish_sample/2 calls.

Parameters

Returns

  • {:ok, count} - Number of samples published
  • {:error, reason} - Failure

Examples

{:ok, 100} = adapter.publish_batch(samples, queue: "cns_narratives")

publish_sample(sample, opts)

@callback publish_sample(sample :: map(), opts :: keyword()) ::
  {:ok, job_id()} | {:error, error()}

Publish a single sample to Anvil labeling queue.

Parameters

  • sample - The Forge sample to publish
  • opts - Options including:
    • :queue - Target Anvil queue name (required)
    • :include_measurements - Include measurements in metadata (default: true)

Returns

  • {:ok, job_id} - Success with Anvil labeling job ID
  • {:error, reason} - Failure

Examples

sample = %Forge.Sample{id: "abc-123", data: %{"claim" => "Earth orbits Sun"}}
{:ok, job_id} = adapter.publish_sample(sample, queue: "cns_narratives")

sync_labels(sample_id, opts)

@callback sync_labels(sample_id(), opts :: keyword()) ::
  {:ok, non_neg_integer()} | {:error, error()}

Sync labels from Anvil back to Forge sample metadata.

This is typically called after labeling is complete to enrich Forge samples with human annotations.

Parameters

  • sample_id - Forge sample UUID
  • opts - Options including:
    • :force - Re-sync even if labels already exist (default: false)

Returns

  • {:ok, count} - Number of labels synced
  • {:error, reason} - Failure

Examples

{:ok, 3} = adapter.sync_labels("sample-123")

Functions

create_queue_for_pipeline(pipeline_id, opts \\ [])

Create an Anvil queue for pipeline samples.

Delegates to the configured adapter.

get_labels(sample_id)

Fetch labels for a sample from Anvil.

Delegates to the configured adapter.

get_queue_stats(queue_name)

Fetch queue statistics from Anvil.

Delegates to the configured adapter.

publish_batch(samples, opts \\ [])

Batch publish samples to Anvil labeling queue.

Delegates to the configured adapter.

publish_sample(sample, opts \\ [])

Publish a sample to Anvil labeling queue.

Delegates to the configured adapter.

sample_to_dto(sample, opts \\ [])

Convert a Forge sample to Anvil-ready DTO format.

Parameters

  • sample - Forge sample map/struct
  • opts - Options including:
    • :include_measurements - Include measurements in metadata (default: true)

Returns

Map with keys: :sample_id, :title, :body, :metadata

Examples

dto = Forge.AnvilBridge.sample_to_dto(sample, include_measurements: true)
# %{
#   sample_id: "abc-123",
#   title: "Claim: Earth orbits Sun",
#   body: "Narrative A: ...

Narrative B: ...",

#   metadata: %{pipeline_id: "...", measurements: %{...}}
# }

sync_labels(sample_id, opts \\ [])

Sync labels from Anvil back to Forge sample metadata.

Delegates to the configured adapter.