Edifice.Generative.Diffusion (Edifice v0.2.0)

Copy Markdown View Source

Diffusion Policy: Action generation via denoising diffusion.

Implements Diffusion Policy from "Diffusion Policy: Visuomotor Policy Learning via Action Diffusion" (Chi et al., RSS 2023). Instead of directly predicting actions, we learn to denoise random noise into actions conditioned on observations.

Key Innovation: DDPM for Actions

Traditional policies: a = pi(o) - direct mapping Diffusion Policy: a = denoise(noise | o) - iterative refinement

Training:
  1. Sample action sequence a_0 from data
  2. Add noise: a_t = sqrt(alpha_bar_t)*a_0 + sqrt(1-alpha_bar_t)*eps
  3. Predict noise: eps_hat = network(a_t, t, obs)
  4. Loss: ||eps - eps_hat||^2

Inference:
  1. Sample a_T ~ N(0, I)
  2. For t = T...1: a_{t-1} = denoise(a_t, t, obs)
  3. Return a_0

Architecture

Observations [batch, obs_dim]
      |
      v
+-------------------------------------+
|  Observation Encoder                 |
|  (MLP or temporal backbone)          |
+-------------------------------------+
      |
      v obs_embed
+-------------------------------------+
|  Denoising Network                   |
|  Input: (noisy_actions, timestep,    |
|          obs_embed)                  |
|  Output: predicted_noise             |
+-------------------------------------+
      |
      v
Denoised Actions [batch, action_horizon, action_dim]

Advantages

FeatureBenefit
Multi-modalCan represent multiple valid actions
High-dimScales well to action sequences
StableMSE loss is simple and stable
ExpressiveCaptures complex action distributions

Usage

# Build diffusion policy
model = Diffusion.build(
  obs_size: 287,
  action_dim: 64,
  action_horizon: 8,
  num_diffusion_steps: 100
)

# Training: predict noise
{loss, predicted_noise} = Diffusion.training_step(
  model, params, observations, actions, key
)

# Inference: denoise to get actions
actions = Diffusion.sample(model, params, observations, key)

References

Summary

Types

Options for build/1.

Functions

Build a Diffusion Policy model.

Build the denoising network (noise predictor).

Build the observation encoder for temporal inputs.

Compute training loss: MSE between true and predicted noise.

Default action prediction horizon

Beta schedule end

Beta schedule start

Default hidden dimension

Default number of denoiser layers

Default number of diffusion timesteps

Fast inference configuration with fewer diffusion steps.

Precompute diffusion schedule constants.

Get the output size of a Diffusion Policy model.

Calculate approximate parameter count for a Diffusion Policy model.

Forward diffusion: add noise to actions.

Get recommended defaults for action generation.

Types

build_opt()

@type build_opt() ::
  {:obs_size, pos_integer()}
  | {:action_dim, pos_integer()}
  | {:action_horizon, pos_integer()}
  | {:hidden_size, pos_integer()}
  | {:num_layers, pos_integer()}
  | {:num_steps, pos_integer()}

Options for build/1.

Functions

build(opts \\ [])

@spec build([build_opt()]) :: Axon.t()

Build a Diffusion Policy model.

Options

  • :obs_size - Size of observation embedding (required)
  • :action_dim - Dimension of action space (required)
  • :action_horizon - Number of actions to predict (default: 8)
  • :hidden_size - Hidden dimension (default: 256)
  • :num_layers - Number of denoiser layers (default: 4)
  • :num_steps - Number of diffusion timesteps (default: 100)

Returns

An Axon model that predicts noise given (noisy_actions, timestep, obs).

build_denoiser(noisy_actions, timestep, observations, opts)

@spec build_denoiser(Axon.t(), Axon.t(), Axon.t(), keyword()) :: Axon.t()

Build the denoising network (noise predictor).

Architecture: MLP with sinusoidal timestep embedding and observation conditioning.

build_obs_encoder(opts \\ [])

@spec build_obs_encoder(keyword()) :: Axon.t()

Build the observation encoder for temporal inputs.

Processes sequence of observations into a single embedding.

compute_loss(true_noise, predicted_noise)

@spec compute_loss(Nx.Tensor.t(), Nx.Tensor.t()) :: Nx.Tensor.t()

Compute training loss: MSE between true and predicted noise.

default_action_horizon()

@spec default_action_horizon() :: pos_integer()

Default action prediction horizon

default_beta_end()

@spec default_beta_end() :: float()

Beta schedule end

default_beta_start()

@spec default_beta_start() :: float()

Beta schedule start

default_hidden_size()

@spec default_hidden_size() :: pos_integer()

Default hidden dimension

default_num_layers()

@spec default_num_layers() :: pos_integer()

Default number of denoiser layers

default_num_steps()

@spec default_num_steps() :: pos_integer()

Default number of diffusion timesteps

fast_inference_defaults()

@spec fast_inference_defaults() :: keyword()

Fast inference configuration with fewer diffusion steps.

make_schedule(opts \\ [])

@spec make_schedule(keyword()) :: map()

Precompute diffusion schedule constants.

Returns a map with:

  • :betas - Noise schedule beta_t
  • :alphas - 1 - beta_t
  • :alphas_cumprod - alpha_bar_t = product of alpha_s
  • :sqrt_alphas_cumprod - sqrt(alpha_bar_t)
  • :sqrt_one_minus_alphas_cumprod - sqrt(1-alpha_bar_t)
  • :sqrt_recip_alphas - 1/sqrt(alpha_t)
  • :posterior_variance - beta_tilde_t for sampling

output_size(opts \\ [])

@spec output_size(keyword()) :: non_neg_integer()

Get the output size of a Diffusion Policy model.

Returns action_horizon * action_dim.

p_sample(noisy_actions, predicted_noise, timestep, random_noise, schedule)

@spec p_sample(Nx.Tensor.t(), Nx.Tensor.t(), Nx.Tensor.t(), Nx.Tensor.t(), map()) ::
  Nx.Tensor.t()

Single denoising step (reverse process).

a_{t-1} = (1/sqrt(alpha_t)) * (a_t - (beta_t/sqrt(1-alpha_bar_t)) * eps_hat) + sqrt(beta_tilde_t) * z

param_count(opts)

@spec param_count(keyword()) :: non_neg_integer()

Calculate approximate parameter count for a Diffusion Policy model.

q_sample(actions, timestep, noise, schedule)

@spec q_sample(Nx.Tensor.t(), Nx.Tensor.t(), Nx.Tensor.t(), map()) :: Nx.Tensor.t()

Forward diffusion: add noise to actions.

a_t = sqrt(alpha_bar_t) * a_0 + sqrt(1-alpha_bar_t) * eps