Fivetrex
View SourceElixir client library for the Fivetran REST API.
Fivetrex provides a powerful, idiomatic Elixir interface for managing Fivetran resources including Groups, Connectors, and Destinations. Built on Req, it offers streaming pagination, structured error handling, and a clean functional API.
Features
- Core API Coverage - Full CRUD operations for Groups, Connectors, Destinations, and Webhooks
- Webhook Support - Create/manage webhooks with HMAC-SHA256 signature verification and a ready-to-use Plug for Phoenix
- Schema Metadata - Query and configure connector schema, table, and column settings
- Stream-based Pagination - Efficiently iterate over thousands of resources using Elixir Streams
- Typed Structs - All responses are parsed into typed structs for compile-time safety
- Structured Errors - Pattern-matchable error types for robust error handling
- Built-in Retry - Automatic retry with exponential backoff for rate limits and transient errors
- Safety Valves - Destructive operations like
resync!require explicit confirmation - Zero Configuration - Works out of the box with just API credentials
API Coverage
Fivetrex covers the core Fivetran API resources needed for managing data pipelines:
| Fivetran API Resource | Status | Functions |
|---|---|---|
| Groups | ✅ Full | list, stream, get, create, update, delete |
| Connectors | ✅ Full | list, stream, get, create, update, delete, sync, resync!, pause, resume, get_state, get_sync_status, get_schema_config, update_schema_config, reload_schema_config, get_table_columns, set_sync_frequency |
| Destinations | ✅ Full | get, create, update, delete, test |
| Webhooks | ✅ Full | list, stream, get, create_account, create_group, update, delete, test |
| Users | ❌ | Not implemented |
| Teams | ❌ | Not implemented |
| Roles | ❌ | Not implemented |
| Transformations | ❌ | Not implemented |
| Certificates | ❌ | Not implemented |
| Log Services | ❌ | Not implemented |
Note: The implemented resources (Groups, Connectors, Destinations, Webhooks) cover the most commonly used Fivetran functionality for managing data pipelines programmatically.
Installation
Add fivetrex to your list of dependencies in mix.exs:
def deps do
[
{:fivetrex, "~> 0.2.0"}
]
endQuick Start
Creating a Client
All API operations require a client configured with your Fivetran API credentials:
# Create a client with explicit credentials
client = Fivetrex.client(
api_key: "your_api_key",
api_secret: "your_api_secret"
)
# Or use environment variables
client = Fivetrex.client(
api_key: System.get_env("FIVETRAN_API_KEY"),
api_secret: System.get_env("FIVETRAN_API_SECRET")
)Basic Operations
# List all groups
{:ok, %{items: groups, next_cursor: _}} = Fivetrex.Groups.list(client)
# Get a specific group
{:ok, group} = Fivetrex.Groups.get(client, "group_id")
# Create a new group
{:ok, group} = Fivetrex.Groups.create(client, %{name: "My Data Warehouse"})
# List connectors in a group
{:ok, %{items: connectors}} = Fivetrex.Connectors.list(client, group.id)
# Trigger a sync
{:ok, _} = Fivetrex.Connectors.sync(client, "connector_id")
# Pause and resume connectors
{:ok, _} = Fivetrex.Connectors.pause(client, "connector_id")
{:ok, _} = Fivetrex.Connectors.resume(client, "connector_id")Streaming
Fivetrex uses Elixir Streams to handle Fivetran's cursor-based pagination transparently. This allows you to iterate over thousands of resources without loading them all into memory:
# Stream all groups
client
|> Fivetrex.Groups.stream()
|> Enum.each(fn group ->
IO.puts("Group: #{group.name}")
end)
# Find all syncing connectors across all groups
syncing_connectors =
client
|> Fivetrex.Groups.stream()
|> Stream.flat_map(fn group ->
Fivetrex.Connectors.stream(client, group.id)
end)
|> Stream.filter(&Fivetrex.Models.Connector.syncing?/1)
|> Enum.to_list()
# Take only the first 10 broken connectors
broken =
Fivetrex.Connectors.stream(client, "group_id")
|> Stream.filter(fn c -> c.status["sync_state"] == "broken" end)
|> Enum.take(10)Working with Connectors
Creating a Connector
{:ok, connector} = Fivetrex.Connectors.create(client, %{
group_id: "group_id",
service: "postgres",
config: %{
host: "db.example.com",
port: 5432,
database: "production",
user: "fivetran_user",
password: "secret"
}
})Sync Operations
# Trigger an incremental sync
{:ok, _} = Fivetrex.Connectors.sync(client, connector.id)
# Get current sync state
{:ok, state} = Fivetrex.Connectors.get_state(client, connector.id)
# Historical resync (DANGEROUS - requires confirmation)
# This wipes all data and re-imports from scratch
{:ok, _} = Fivetrex.Connectors.resync!(client, connector.id, confirm: true)Connector Helper Functions
alias Fivetrex.Models.Connector
# Check connector status
Connector.syncing?(connector) # => true/false
Connector.paused?(connector) # => true/false
Connector.sync_state(connector) # => "scheduled" | "syncing" | "paused" | nilWorking with Destinations
# Get a destination
{:ok, destination} = Fivetrex.Destinations.get(client, "destination_id")
# Create a Snowflake destination
{:ok, destination} = Fivetrex.Destinations.create(client, %{
group_id: "group_id",
service: "snowflake",
region: "US",
time_zone_offset: "-5",
config: %{
host: "account.snowflakecomputing.com",
port: 443,
database: "ANALYTICS",
auth: "PASSWORD",
user: "FIVETRAN_USER",
password: "secret"
}
})
# Test destination connectivity
{:ok, result} = Fivetrex.Destinations.test(client, destination.id)Working with Webhooks
Webhooks provide real-time notifications about Fivetran events like sync starts and completions.
Creating Webhooks
# Create an account-level webhook (receives events for all connectors)
{:ok, webhook} = Fivetrex.Webhooks.create_account(client, %{
url: "https://example.com/fivetran/webhook",
events: ["sync_start", "sync_end"],
active: true,
secret: "my_webhook_secret"
})
# Create a group-level webhook (receives events for connectors in that group)
{:ok, webhook} = Fivetrex.Webhooks.create_group(client, "group_id", %{
url: "https://example.com/fivetran/webhook",
events: ["sync_end"],
active: true
})
# List all webhooks
{:ok, %{items: webhooks}} = Fivetrex.Webhooks.list(client)
# Test a webhook
{:ok, _} = Fivetrex.Webhooks.test(client, webhook.id)Handling Incoming Webhooks
Fivetrex includes a Plug for Phoenix/Bandit applications that handles signature verification automatically:
# In your Phoenix controller
defmodule MyAppWeb.FivetranWebhookController do
use MyAppWeb, :controller
plug Fivetrex.WebhookPlug,
secret: {MyApp.Config, :fivetran_webhook_secret, []}
def receive(conn, _params) do
event = conn.assigns.fivetran_event
case event.event do
"sync_end" -> handle_sync_completion(event)
"sync_start" -> handle_sync_start(event)
_ -> :ok
end
json(conn, %{status: "ok"})
end
endFor manual signature verification:
# Verify webhook signature
case Fivetrex.WebhookSignature.verify(raw_body, signature, secret) do
:ok -> process_webhook(payload)
{:error, :invalid_signature} -> reject_request()
{:error, :missing_signature} -> reject_request()
endSchema Metadata
Query and configure which schemas, tables, and columns are synced.
# Get schema configuration for a connector
{:ok, config} = Fivetrex.Connectors.get_schema_config(client, "connector_id")
# Iterate through schemas and tables
for {schema_name, schema} <- config.schemas, schema.enabled do
IO.puts("Schema: #{schema_name}")
for {table_name, table} <- schema.tables, table.enabled do
IO.puts(" Table: #{table_name} (#{table.sync_mode})")
end
end
# Get columns for a specific table
{:ok, columns} = Fivetrex.Connectors.get_table_columns(
client,
"connector_id",
"schema_name",
"table_name"
)
# Update schema configuration
{:ok, updated} = Fivetrex.Connectors.update_schema_config(client, "connector_id", %{
schemas: %{
"public" => %{
enabled: true,
tables: %{
"users" => %{enabled: true},
"logs" => %{enabled: false}
}
}
}
})
# Reload schema (detect new tables/columns from source)
{:ok, config} = Fivetrex.Connectors.reload_schema_config(client, "connector_id")Sync Status and Frequency
# Get current sync status
{:ok, status} = Fivetrex.Connectors.get_sync_status(client, "connector_id")
if Fivetrex.Models.SyncStatus.syncing?(status) do
IO.puts("Sync in progress...")
end
IO.puts("Last successful sync: #{status.succeeded_at}")
# Set sync frequency (in minutes)
{:ok, connector} = Fivetrex.Connectors.set_sync_frequency(client, "connector_id", 60)Error Handling
All API functions return {:ok, result} on success or
{:error, %Fivetrex.Error{}} on failure. Errors are structured for easy pattern
matching:
case Fivetrex.Connectors.get(client, "connector_id") do
{:ok, connector} ->
# Success - connector is a %Fivetrex.Models.Connector{}
IO.puts("Found connector: #{connector.id}")
{:error, %Fivetrex.Error{type: :not_found}} ->
# 404 - Resource doesn't exist
IO.puts("Connector not found")
{:error, %Fivetrex.Error{type: :unauthorized}} ->
# 401 - Invalid API credentials
IO.puts("Check your API key and secret")
{:error, %Fivetrex.Error{type: :rate_limited, retry_after: seconds}} ->
# 429 - Too many requests
IO.puts("Rate limited, retry after #{seconds} seconds")
Process.sleep(seconds * 1000)
# Retry...
{:error, %Fivetrex.Error{type: :server_error, status: status}} ->
# 5xx - Fivetran server error
IO.puts("Server error: #{status}")
{:error, %Fivetrex.Error{message: message}} ->
# Catch-all for other errors
IO.puts("Error: #{message}")
endError Types
| Type | HTTP Status | Description |
|---|---|---|
:unauthorized | 401 | Invalid or missing API credentials |
:not_found | 404 | Resource does not exist |
:rate_limited | 429 | Too many requests (check retry_after) |
:server_error | 5xx | Fivetran server error |
:unknown | Other | Unexpected error |
API Reference
Groups
| Function | Description |
|---|---|
Fivetrex.Groups.list/2 | List all groups with pagination |
Fivetrex.Groups.stream/2 | Stream all groups (handles pagination) |
Fivetrex.Groups.get/2 | Get a group by ID |
Fivetrex.Groups.create/2 | Create a new group |
Fivetrex.Groups.update/3 | Update a group |
Fivetrex.Groups.delete/2 | Delete a group |
Connectors
Destinations
| Function | Description |
|---|---|
Fivetrex.Destinations.get/2 | Get a destination by ID |
Fivetrex.Destinations.create/2 | Create a new destination |
Fivetrex.Destinations.update/3 | Update a destination |
Fivetrex.Destinations.delete/2 | Delete a destination |
Fivetrex.Destinations.test/2 | Run destination connection tests |
Webhooks
| Function | Description |
|---|---|
Fivetrex.Webhooks.list/2 | List all webhooks |
Fivetrex.Webhooks.stream/2 | Stream all webhooks |
Fivetrex.Webhooks.get/2 | Get a webhook by ID |
Fivetrex.Webhooks.create_account/2 | Create an account-level webhook |
Fivetrex.Webhooks.create_group/3 | Create a group-level webhook |
Fivetrex.Webhooks.update/3 | Update a webhook |
Fivetrex.Webhooks.delete/2 | Delete a webhook |
Fivetrex.Webhooks.test/2 | Send a test event to a webhook |
Webhook Handling
| Function/Module | Description |
|---|---|
Fivetrex.WebhookPlug | Plug for Phoenix webhook endpoints |
Fivetrex.WebhookSignature.verify/3 | Verify webhook signature |
Fivetrex.WebhookSignature.compute_signature/2 | Compute HMAC-SHA256 signature |
Configuration
Runtime Configuration
Fivetrex is designed for runtime configuration. Create clients with credentials at runtime rather than compile-time:
# In your application code
defmodule MyApp.Fivetran do
def client do
Fivetrex.client(
api_key: Application.get_env(:my_app, :fivetran_api_key),
api_secret: Application.get_env(:my_app, :fivetran_api_secret)
)
end
end
# In config/runtime.exs
config :my_app,
fivetran_api_key: System.get_env("FIVETRAN_API_KEY"),
fivetran_api_secret: System.get_env("FIVETRAN_API_SECRET")Testing with Custom Base URL
For testing, you can override the base URL:
client = Fivetrex.client(
api_key: "test",
api_secret: "test",
base_url: "http://localhost:4000"
)Development
# Run all checks (format, credo, compile, test)
mix precommit
# Run CI checks (check-formatted, credo, compile, test + integration)
mix ci
# Run tests
mix test
# Run tests with coverage
mix test --cover
Fivetrex uses Bypass for unit tests. Integration
tests run against the real Fivetran API and require credentials in .env.
Documentation
Generate documentation locally:
mix docs
open doc/index.html
License
MIT License. See LICENSE for details.