Fivetrex

View Source

Hex.pm Documentation

Elixir client library for the Fivetran REST API.

Fivetrex provides a powerful, idiomatic Elixir interface for managing Fivetran resources including Groups, Connectors, and Destinations. Built on Req, it offers streaming pagination, structured error handling, and a clean functional API.

Features

  • Core API Coverage - Full CRUD operations for Groups, Connectors, Destinations, and Webhooks
  • Webhook Support - Create/manage webhooks with HMAC-SHA256 signature verification and a ready-to-use Plug for Phoenix
  • Schema Metadata - Query and configure connector schema, table, and column settings
  • Stream-based Pagination - Efficiently iterate over thousands of resources using Elixir Streams
  • Typed Structs - All responses are parsed into typed structs for compile-time safety
  • Structured Errors - Pattern-matchable error types for robust error handling
  • Built-in Retry - Automatic retry with exponential backoff for rate limits and transient errors
  • Safety Valves - Destructive operations like resync! require explicit confirmation
  • Zero Configuration - Works out of the box with just API credentials

API Coverage

Fivetrex covers the core Fivetran API resources needed for managing data pipelines:

Fivetran API ResourceStatusFunctions
Groups✅ Fulllist, stream, get, create, update, delete
Connectors✅ Fulllist, stream, get, create, update, delete, sync, resync!, pause, resume, get_state, get_sync_status, get_schema_config, update_schema_config, reload_schema_config, get_table_columns, set_sync_frequency
Destinations✅ Fullget, create, update, delete, test
Webhooks✅ Fulllist, stream, get, create_account, create_group, update, delete, test
UsersNot implemented
TeamsNot implemented
RolesNot implemented
TransformationsNot implemented
CertificatesNot implemented
Log ServicesNot implemented

Note: The implemented resources (Groups, Connectors, Destinations, Webhooks) cover the most commonly used Fivetran functionality for managing data pipelines programmatically.

Installation

Add fivetrex to your list of dependencies in mix.exs:

def deps do
  [
    {:fivetrex, "~> 0.2.0"}
  ]
end

Quick Start

Creating a Client

All API operations require a client configured with your Fivetran API credentials:

# Create a client with explicit credentials
client = Fivetrex.client(
  api_key: "your_api_key",
  api_secret: "your_api_secret"
)

# Or use environment variables
client = Fivetrex.client(
  api_key: System.get_env("FIVETRAN_API_KEY"),
  api_secret: System.get_env("FIVETRAN_API_SECRET")
)

Basic Operations

# List all groups
{:ok, %{items: groups, next_cursor: _}} = Fivetrex.Groups.list(client)

# Get a specific group
{:ok, group} = Fivetrex.Groups.get(client, "group_id")

# Create a new group
{:ok, group} = Fivetrex.Groups.create(client, %{name: "My Data Warehouse"})

# List connectors in a group
{:ok, %{items: connectors}} = Fivetrex.Connectors.list(client, group.id)

# Trigger a sync
{:ok, _} = Fivetrex.Connectors.sync(client, "connector_id")

# Pause and resume connectors
{:ok, _} = Fivetrex.Connectors.pause(client, "connector_id")
{:ok, _} = Fivetrex.Connectors.resume(client, "connector_id")

Streaming

Fivetrex uses Elixir Streams to handle Fivetran's cursor-based pagination transparently. This allows you to iterate over thousands of resources without loading them all into memory:

# Stream all groups
client
|> Fivetrex.Groups.stream()
|> Enum.each(fn group ->
  IO.puts("Group: #{group.name}")
end)

# Find all syncing connectors across all groups
syncing_connectors =
  client
  |> Fivetrex.Groups.stream()
  |> Stream.flat_map(fn group ->
    Fivetrex.Connectors.stream(client, group.id)
  end)
  |> Stream.filter(&Fivetrex.Models.Connector.syncing?/1)
  |> Enum.to_list()

# Take only the first 10 broken connectors
broken =
  Fivetrex.Connectors.stream(client, "group_id")
  |> Stream.filter(fn c -> c.status["sync_state"] == "broken" end)
  |> Enum.take(10)

Working with Connectors

Creating a Connector

{:ok, connector} = Fivetrex.Connectors.create(client, %{
  group_id: "group_id",
  service: "postgres",
  config: %{
    host: "db.example.com",
    port: 5432,
    database: "production",
    user: "fivetran_user",
    password: "secret"
  }
})

Sync Operations

# Trigger an incremental sync
{:ok, _} = Fivetrex.Connectors.sync(client, connector.id)

# Get current sync state
{:ok, state} = Fivetrex.Connectors.get_state(client, connector.id)

# Historical resync (DANGEROUS - requires confirmation)
# This wipes all data and re-imports from scratch
{:ok, _} = Fivetrex.Connectors.resync!(client, connector.id, confirm: true)

Connector Helper Functions

alias Fivetrex.Models.Connector

# Check connector status
Connector.syncing?(connector)   # => true/false
Connector.paused?(connector)    # => true/false
Connector.sync_state(connector) # => "scheduled" | "syncing" | "paused" | nil

Working with Destinations

# Get a destination
{:ok, destination} = Fivetrex.Destinations.get(client, "destination_id")

# Create a Snowflake destination
{:ok, destination} = Fivetrex.Destinations.create(client, %{
  group_id: "group_id",
  service: "snowflake",
  region: "US",
  time_zone_offset: "-5",
  config: %{
    host: "account.snowflakecomputing.com",
    port: 443,
    database: "ANALYTICS",
    auth: "PASSWORD",
    user: "FIVETRAN_USER",
    password: "secret"
  }
})

# Test destination connectivity
{:ok, result} = Fivetrex.Destinations.test(client, destination.id)

Working with Webhooks

Webhooks provide real-time notifications about Fivetran events like sync starts and completions.

Creating Webhooks

# Create an account-level webhook (receives events for all connectors)
{:ok, webhook} = Fivetrex.Webhooks.create_account(client, %{
  url: "https://example.com/fivetran/webhook",
  events: ["sync_start", "sync_end"],
  active: true,
  secret: "my_webhook_secret"
})

# Create a group-level webhook (receives events for connectors in that group)
{:ok, webhook} = Fivetrex.Webhooks.create_group(client, "group_id", %{
  url: "https://example.com/fivetran/webhook",
  events: ["sync_end"],
  active: true
})

# List all webhooks
{:ok, %{items: webhooks}} = Fivetrex.Webhooks.list(client)

# Test a webhook
{:ok, _} = Fivetrex.Webhooks.test(client, webhook.id)

Handling Incoming Webhooks

Fivetrex includes a Plug for Phoenix/Bandit applications that handles signature verification automatically:

# In your Phoenix controller
defmodule MyAppWeb.FivetranWebhookController do
  use MyAppWeb, :controller

  plug Fivetrex.WebhookPlug,
    secret: {MyApp.Config, :fivetran_webhook_secret, []}

  def receive(conn, _params) do
    event = conn.assigns.fivetran_event

    case event.event do
      "sync_end" -> handle_sync_completion(event)
      "sync_start" -> handle_sync_start(event)
      _ -> :ok
    end

    json(conn, %{status: "ok"})
  end
end

For manual signature verification:

# Verify webhook signature
case Fivetrex.WebhookSignature.verify(raw_body, signature, secret) do
  :ok -> process_webhook(payload)
  {:error, :invalid_signature} -> reject_request()
  {:error, :missing_signature} -> reject_request()
end

Schema Metadata

Query and configure which schemas, tables, and columns are synced.

# Get schema configuration for a connector
{:ok, config} = Fivetrex.Connectors.get_schema_config(client, "connector_id")

# Iterate through schemas and tables
for {schema_name, schema} <- config.schemas, schema.enabled do
  IO.puts("Schema: #{schema_name}")

  for {table_name, table} <- schema.tables, table.enabled do
    IO.puts("  Table: #{table_name} (#{table.sync_mode})")
  end
end

# Get columns for a specific table
{:ok, columns} = Fivetrex.Connectors.get_table_columns(
  client,
  "connector_id",
  "schema_name",
  "table_name"
)

# Update schema configuration
{:ok, updated} = Fivetrex.Connectors.update_schema_config(client, "connector_id", %{
  schemas: %{
    "public" => %{
      enabled: true,
      tables: %{
        "users" => %{enabled: true},
        "logs" => %{enabled: false}
      }
    }
  }
})

# Reload schema (detect new tables/columns from source)
{:ok, config} = Fivetrex.Connectors.reload_schema_config(client, "connector_id")

Sync Status and Frequency

# Get current sync status
{:ok, status} = Fivetrex.Connectors.get_sync_status(client, "connector_id")

if Fivetrex.Models.SyncStatus.syncing?(status) do
  IO.puts("Sync in progress...")
end

IO.puts("Last successful sync: #{status.succeeded_at}")

# Set sync frequency (in minutes)
{:ok, connector} = Fivetrex.Connectors.set_sync_frequency(client, "connector_id", 60)

Error Handling

All API functions return {:ok, result} on success or {:error, %Fivetrex.Error{}} on failure. Errors are structured for easy pattern matching:

case Fivetrex.Connectors.get(client, "connector_id") do
  {:ok, connector} ->
    # Success - connector is a %Fivetrex.Models.Connector{}
    IO.puts("Found connector: #{connector.id}")

  {:error, %Fivetrex.Error{type: :not_found}} ->
    # 404 - Resource doesn't exist
    IO.puts("Connector not found")

  {:error, %Fivetrex.Error{type: :unauthorized}} ->
    # 401 - Invalid API credentials
    IO.puts("Check your API key and secret")

  {:error, %Fivetrex.Error{type: :rate_limited, retry_after: seconds}} ->
    # 429 - Too many requests
    IO.puts("Rate limited, retry after #{seconds} seconds")
    Process.sleep(seconds * 1000)
    # Retry...

  {:error, %Fivetrex.Error{type: :server_error, status: status}} ->
    # 5xx - Fivetran server error
    IO.puts("Server error: #{status}")

  {:error, %Fivetrex.Error{message: message}} ->
    # Catch-all for other errors
    IO.puts("Error: #{message}")
end

Error Types

TypeHTTP StatusDescription
:unauthorized401Invalid or missing API credentials
:not_found404Resource does not exist
:rate_limited429Too many requests (check retry_after)
:server_error5xxFivetran server error
:unknownOtherUnexpected error

API Reference

Groups

FunctionDescription
Fivetrex.Groups.list/2List all groups with pagination
Fivetrex.Groups.stream/2Stream all groups (handles pagination)
Fivetrex.Groups.get/2Get a group by ID
Fivetrex.Groups.create/2Create a new group
Fivetrex.Groups.update/3Update a group
Fivetrex.Groups.delete/2Delete a group

Connectors

FunctionDescription
Fivetrex.Connectors.list/3List connectors in a group
Fivetrex.Connectors.stream/3Stream all connectors in a group
Fivetrex.Connectors.get/2Get a connector by ID
Fivetrex.Connectors.create/2Create a new connector
Fivetrex.Connectors.update/3Update a connector
Fivetrex.Connectors.delete/2Delete a connector
Fivetrex.Connectors.sync/2Trigger an incremental sync
Fivetrex.Connectors.resync!/3Trigger a historical resync (destructive!)
Fivetrex.Connectors.get_state/2Get connector sync state
Fivetrex.Connectors.pause/2Pause a connector
Fivetrex.Connectors.resume/2Resume a paused connector
Fivetrex.Connectors.get_sync_status/2Get sync status summary
Fivetrex.Connectors.set_sync_frequency/3Set sync frequency in minutes
Fivetrex.Connectors.get_schema_config/2Get schema/table/column configuration
Fivetrex.Connectors.update_schema_config/3Update schema configuration
Fivetrex.Connectors.reload_schema_config/2Reload schema from source
Fivetrex.Connectors.get_table_columns/4Get columns for a specific table

Destinations

FunctionDescription
Fivetrex.Destinations.get/2Get a destination by ID
Fivetrex.Destinations.create/2Create a new destination
Fivetrex.Destinations.update/3Update a destination
Fivetrex.Destinations.delete/2Delete a destination
Fivetrex.Destinations.test/2Run destination connection tests

Webhooks

FunctionDescription
Fivetrex.Webhooks.list/2List all webhooks
Fivetrex.Webhooks.stream/2Stream all webhooks
Fivetrex.Webhooks.get/2Get a webhook by ID
Fivetrex.Webhooks.create_account/2Create an account-level webhook
Fivetrex.Webhooks.create_group/3Create a group-level webhook
Fivetrex.Webhooks.update/3Update a webhook
Fivetrex.Webhooks.delete/2Delete a webhook
Fivetrex.Webhooks.test/2Send a test event to a webhook

Webhook Handling

Function/ModuleDescription
Fivetrex.WebhookPlugPlug for Phoenix webhook endpoints
Fivetrex.WebhookSignature.verify/3Verify webhook signature
Fivetrex.WebhookSignature.compute_signature/2Compute HMAC-SHA256 signature

Configuration

Runtime Configuration

Fivetrex is designed for runtime configuration. Create clients with credentials at runtime rather than compile-time:

# In your application code
defmodule MyApp.Fivetran do
  def client do
    Fivetrex.client(
      api_key: Application.get_env(:my_app, :fivetran_api_key),
      api_secret: Application.get_env(:my_app, :fivetran_api_secret)
    )
  end
end

# In config/runtime.exs
config :my_app,
  fivetran_api_key: System.get_env("FIVETRAN_API_KEY"),
  fivetran_api_secret: System.get_env("FIVETRAN_API_SECRET")

Testing with Custom Base URL

For testing, you can override the base URL:

client = Fivetrex.client(
  api_key: "test",
  api_secret: "test",
  base_url: "http://localhost:4000"
)

Development

# Run all checks (format, credo, compile, test)
mix precommit

# Run CI checks (check-formatted, credo, compile, test + integration)
mix ci

# Run tests
mix test

# Run tests with coverage
mix test --cover

Fivetrex uses Bypass for unit tests. Integration tests run against the real Fivetran API and require credentials in .env.

Documentation

Generate documentation locally:

mix docs
open doc/index.html

License

MIT License. See LICENSE for details.