Fivetrex.Connectors (Fivetrex v0.2.1)
View SourceFunctions for managing Fivetran Connectors.
A Connector is the core operational entity in Fivetran, representing the pipe between a data source (e.g., Salesforce, PostgreSQL, Google Ads) and a destination warehouse. This module provides functions for CRUD operations as well as sync control.
Overview
Connectors handle the actual data movement. Each connector:
- Belongs to a single group
- Connects to a specific data source type (service)
- Has configuration specific to that service type
- Maintains sync state and schedule
Connector States
Connectors have various states tracked in the status field:
"scheduled"- Waiting for next sync"syncing"- Currently syncing data"paused"- Manually paused"rescheduled"- Sync was rescheduled
Use helper functions on Fivetrex.Models.Connector to check state:
Connector.syncing?(connector) # => true/false
Connector.paused?(connector) # => true/falseCommon Operations
List Connectors in a Group
{:ok, %{items: connectors}} = Fivetrex.Connectors.list(client, "group_id")Get a Connector
{:ok, connector} = Fivetrex.Connectors.get(client, "connector_id")Trigger a Sync
{:ok, _} = Fivetrex.Connectors.sync(client, "connector_id")Pause/Resume
{:ok, _} = Fivetrex.Connectors.pause(client, "connector_id")
{:ok, _} = Fivetrex.Connectors.resume(client, "connector_id")Dangerous Operations
The resync!/3 function triggers a historical resync, which wipes all synced
data and re-imports from scratch. This can be expensive and time-consuming.
It requires explicit confirmation:
{:ok, _} = Fivetrex.Connectors.resync!(client, "connector_id", confirm: true)See Also
Fivetrex.Models.Connector- The Connector struct with helper functionsFivetrex.Groups- Managing the parent groups
Summary
Types
Result of a sync operation.
Functions
Creates a new connector.
Deletes a connector.
Gets a connector by its ID.
Gets the schema configuration for a connector.
Gets the current state of a connector.
Gets a summary of the connector's current sync status.
Gets the columns for a specific table in a connector.
Lists all connectors in a group.
Pauses a connector.
Reloads the schema configuration from the source.
Resumes a paused connector.
Triggers a historical resync for a connector.
Sets the sync frequency for a connector.
Returns a stream of all connectors in a group, handling pagination automatically.
Triggers an incremental sync for a connector.
Updates an existing connector.
Updates the schema configuration for a connector.
Types
Functions
@spec create(Fivetrex.Client.t(), map()) :: {:ok, Fivetrex.Models.Connector.t()} | {:error, Fivetrex.Error.t()}
Creates a new connector.
The connector configuration is highly dependent on the service type. See Fivetran's documentation for service-specific configuration options.
Parameters
client- The Fivetrex clientparams- A map containing::group_id- Required. The group to create the connector in.:service- Required. The connector type (e.g., "postgres", "salesforce").:config- Required. Service-specific configuration map.:paused- Optional. Start in paused state (default: false).:sync_frequency- Optional. Sync frequency in minutes.
Returns
{:ok, Connector.t()}- The created connector{:error, Fivetrex.Error.t()}- On failure
Examples
Create a PostgreSQL connector:
{:ok, connector} = Fivetrex.Connectors.create(client, %{
group_id: "group_id",
service: "postgres",
config: %{
host: "db.example.com",
port: 5432,
database: "production",
user: "fivetran_user",
password: "secret"
}
})Create a paused connector:
{:ok, connector} = Fivetrex.Connectors.create(client, %{
group_id: "group_id",
service: "salesforce",
paused: true,
config: %{...}
})Create with Connect Card for OAuth flows:
{:ok, connector} = Fivetrex.Connectors.create(client, %{
group_id: "group_id",
service: "google_analytics_4",
connect_card_config: %{
redirect_uri: "https://your.site/callback",
hide_setup_guide: false
}
})
# connector.connect_card will contain:
# %{
# "token" => "eyJ0eXAiOiJKV1QiLCJh...",
# "uri" => "https://fivetran.com/connect-card/setup?auth=..."
# }
redirect_url = connector.connect_card["uri"]
@spec delete(Fivetrex.Client.t(), String.t()) :: :ok | {:error, Fivetrex.Error.t()}
Deletes a connector.
Warning: This permanently deletes the connector and all its sync history. The synced data in your destination is not affected.
Parameters
client- The Fivetrex clientconnector_id- The ID of the connector to delete
Returns
:ok- On successful deletion{:error, Fivetrex.Error.t()}- On failure
Examples
:ok = Fivetrex.Connectors.delete(client, "old_connector_id")
@spec get(Fivetrex.Client.t(), String.t()) :: {:ok, Fivetrex.Models.Connector.t()} | {:error, Fivetrex.Error.t()}
Gets a connector by its ID.
Parameters
client- The Fivetrex clientconnector_id- The unique identifier of the connector
Returns
{:ok, Connector.t()}- The connector{:error, Fivetrex.Error.t()}- On failure
Examples
{:ok, connector} = Fivetrex.Connectors.get(client, "connector_id")
IO.puts("Service: #{connector.service}")
IO.puts("Syncing: #{Connector.syncing?(connector)}")
@spec get_schema_config(Fivetrex.Client.t(), String.t()) :: {:ok, Fivetrex.Models.SchemaConfig.t()} | {:error, Fivetrex.Error.t()}
Gets the schema configuration for a connector.
Returns the current schema, table, and column configuration including enabled/disabled states and sync modes.
Parameters
client- The Fivetrex clientconnector_id- The ID of the connector
Returns
{:ok, SchemaConfig.t()}- The schema configuration{:error, Fivetrex.Error.t()}- On failure
Note
Only explicitly configured (non-default) columns are returned in this response.
For a complete column list, use get_table_columns/4.
Examples
{:ok, config} = Fivetrex.Connectors.get_schema_config(client, "connector_id")
# Iterate through schemas and tables
for {schema_name, schema} <- config.schemas, schema.enabled do
IO.puts("Schema: #{schema_name}")
for {table_name, table} <- schema.tables, table.enabled do
IO.puts(" Table: #{table_name} (sync_mode: #{table.sync_mode})")
end
end
@spec get_state(Fivetrex.Client.t(), String.t()) :: {:ok, map()} | {:error, Fivetrex.Error.t()}
Gets the current state of a connector.
Returns detailed sync state information including cursor positions, which can be useful for debugging sync issues.
Parameters
client- The Fivetrex clientconnector_id- The ID of the connector
Returns
{:ok, map()}- The connector state as a raw map{:error, Fivetrex.Error.t()}- On failure
Examples
{:ok, state} = Fivetrex.Connectors.get_state(client, "connector_id")
IO.inspect(state["state"])
@spec get_sync_status(Fivetrex.Client.t(), String.t()) :: {:ok, Fivetrex.Models.SyncStatus.t()} | {:error, Fivetrex.Error.t()}
Gets a summary of the connector's current sync status.
Returns a structured view of the connector's sync state including last success/failure times. For detailed sync history, configure Fivetran's Log Service to send logs to your data warehouse.
Parameters
client- The Fivetrex clientconnector_id- The ID of the connector
Returns
{:ok, SyncStatus.t()}- A struct containing::sync_state- Current state (e.g.,"syncing","scheduled"):succeeded_at- Last successful sync timestamp:failed_at- Last failed sync timestamp:is_historical_sync- Whether a historical sync is in progress:update_state- Update status
{:error, Fivetrex.Error.t()}- On failure
Examples
{:ok, status} = Fivetrex.Connectors.get_sync_status(client, "connector_id")
IO.puts("Current state: #{status.sync_state}")
IO.puts("Last success: #{status.succeeded_at}")
if SyncStatus.syncing?(status) do
IO.puts("Sync in progress...")
end
@spec get_table_columns(Fivetrex.Client.t(), String.t(), String.t(), String.t()) :: {:ok, %{required(String.t()) => Fivetrex.Models.Column.t()}} | {:error, Fivetrex.Error.t()}
Gets the columns for a specific table in a connector.
Returns the complete column list for a table, including columns using
default settings (which may be omitted from get_schema_config/2).
Parameters
client- The Fivetrex clientconnector_id- The ID of the connectorschema_name- The source schema nametable_name- The source table name
Returns
{:ok, %{String.t() => Column.t()}}- Map of column name to Column struct{:error, Fivetrex.Error.t()}- On failure
Examples
{:ok, columns} = Fivetrex.Connectors.get_table_columns(
client,
"connector_id",
"public",
"users"
)
# Find primary key columns
primary_keys =
columns
|> Enum.filter(fn {_name, col} -> col.is_primary_key end)
|> Enum.map(fn {name, _col} -> name end)
# Find hashed columns
hashed =
columns
|> Enum.filter(fn {_name, col} -> col.hashed end)
|> Enum.map(fn {name, _col} -> name end)
@spec list(Fivetrex.Client.t(), String.t(), keyword()) :: {:ok, %{items: [Fivetrex.Models.Connector.t()], next_cursor: String.t() | nil}} | {:error, Fivetrex.Error.t()}
Lists all connectors in a group.
Returns a paginated list of connectors belonging to the specified group.
Parameters
client- The Fivetrex clientgroup_id- The ID of the group to list connectors fromopts- Optional keyword list::cursor- Pagination cursor from a previous response:limit- Maximum items per page (max 1000)
Returns
{:ok, %{items: [Connector.t()], next_cursor: String.t() | nil}}- Success{:error, Fivetrex.Error.t()}- On failure
Examples
{:ok, %{items: connectors, next_cursor: cursor}} =
Fivetrex.Connectors.list(client, "group_id")
# Check connector states
syncing = Enum.filter(connectors, &Connector.syncing?/1)
@spec pause(Fivetrex.Client.t(), String.t()) :: {:ok, Fivetrex.Models.Connector.t()} | {:error, Fivetrex.Error.t()}
Pauses a connector.
A paused connector will not sync until resumed. This is a convenience
function that calls update/3 with paused: true.
Parameters
client- The Fivetrex clientconnector_id- The ID of the connector to pause
Returns
{:ok, Connector.t()}- The paused connector{:error, Fivetrex.Error.t()}- On failure
Examples
{:ok, connector} = Fivetrex.Connectors.pause(client, "connector_id")
true = Connector.paused?(connector)
@spec reload_schema_config(Fivetrex.Client.t(), String.t(), keyword()) :: {:ok, Fivetrex.Models.SchemaConfig.t()} | {:error, Fivetrex.Error.t()}
Reloads the schema configuration from the source.
This fetches the latest schema from the data source and updates the connector's schema configuration with any new schemas, tables, or columns. This can be slow for large schemas.
Parameters
client- The Fivetrex clientconnector_id- The ID of the connectoropts- Optional keyword list::exclude_mode- How to handle newly discovered items:"PRESERVE"(default) - Keep existing enabled/disabled settings"INCLUDE"- Enable all new schemas and tables"EXCLUDE"- Disable all new schemas and tables
Returns
{:ok, SchemaConfig.t()}- The reloaded schema configuration{:error, Fivetrex.Error.t()}- On failure
Examples
Reload with default settings:
{:ok, config} = Fivetrex.Connectors.reload_schema_config(client, "connector_id")Reload and enable all new items:
{:ok, config} = Fivetrex.Connectors.reload_schema_config(
client,
"connector_id",
exclude_mode: "INCLUDE"
)Reload and disable all new items:
{:ok, config} = Fivetrex.Connectors.reload_schema_config(
client,
"connector_id",
exclude_mode: "EXCLUDE"
)
@spec resume(Fivetrex.Client.t(), String.t()) :: {:ok, Fivetrex.Models.Connector.t()} | {:error, Fivetrex.Error.t()}
Resumes a paused connector.
This is a convenience function that calls update/3 with paused: false.
The connector will begin syncing according to its schedule.
Parameters
client- The Fivetrex clientconnector_id- The ID of the connector to resume
Returns
{:ok, Connector.t()}- The resumed connector{:error, Fivetrex.Error.t()}- On failure
Examples
{:ok, connector} = Fivetrex.Connectors.resume(client, "connector_id")
false = Connector.paused?(connector)
@spec resync!(Fivetrex.Client.t(), String.t(), keyword()) :: {:ok, map()} | {:error, Fivetrex.Error.t()}
Triggers a historical resync for a connector.
WARNING: This is a destructive operation!
A historical resync:
- Wipes all of the connector's sync state
- Re-imports ALL data from the source from scratch
- Can take a very long time for large data sources
- May incur significant costs (both Fivetran and source API costs)
The confirm: true option is required to prevent accidental invocation.
Parameters
client- The Fivetrex clientconnector_id- The ID of the connector to resyncopts- Keyword list::confirm- Required. Must betrueto confirm the operation.
Returns
{:ok, map()}- Resync triggered successfully{:error, Fivetrex.Error.t()}- On failure
Raises
ArgumentError- Ifconfirm: trueis not provided
Examples
# This will raise ArgumentError:
Fivetrex.Connectors.resync!(client, "connector_id", [])
# This works:
{:ok, _} = Fivetrex.Connectors.resync!(client, "connector_id", confirm: true)
@spec set_sync_frequency(Fivetrex.Client.t(), String.t(), pos_integer(), keyword()) :: {:ok, Fivetrex.Models.Connector.t()} | {:error, Fivetrex.Error.t()}
Sets the sync frequency for a connector.
A convenience function for updating sync timing configuration.
Parameters
client- The Fivetrex clientconnector_id- The ID of the connectorfrequency_minutes- Sync frequency in minutesopts- Optional keyword list::schedule_type-"auto"or"manual":daily_sync_time- Time for daily syncs (e.g.,"14:00")
Returns
{:ok, Connector.t()}- The updated connector{:error, Fivetrex.Error.t()}- On failure
Examples
Set to sync every 60 minutes:
{:ok, connector} = Fivetrex.Connectors.set_sync_frequency(client, "id", 60)Set daily sync at 2pm UTC:
{:ok, connector} = Fivetrex.Connectors.set_sync_frequency(client, "id", 1440,
schedule_type: "manual",
daily_sync_time: "14:00"
)
@spec stream(Fivetrex.Client.t(), String.t(), keyword()) :: Enumerable.t()
Returns a stream of all connectors in a group, handling pagination automatically.
This is memory-efficient for groups with many connectors.
Parameters
client- The Fivetrex clientgroup_id- The ID of the groupopts- Options passed to eachlist/3call
Returns
An Enumerable.t() yielding %Fivetrex.Models.Connector{} structs.
Examples
# Find all syncing connectors
syncing =
Fivetrex.Connectors.stream(client, "group_id")
|> Stream.filter(&Connector.syncing?/1)
|> Enum.to_list()
# Process connectors one at a time
Fivetrex.Connectors.stream(client, "group_id")
|> Enum.each(&process_connector/1)
@spec sync(Fivetrex.Client.t(), String.t()) :: {:ok, sync_result()} | {:error, Fivetrex.Error.t()}
Triggers an incremental sync for a connector.
This initiates a sync that only processes data that has changed since the last sync. The sync runs asynchronously; this function returns immediately.
Parameters
client- The Fivetrex clientconnector_id- The ID of the connector to sync
Returns
{:ok, sync_result()}- Sync triggered successfully. Returns a map with::success- Alwaystrueon success:message- Optional message from the API:sync_state- Current sync state if available
{:error, Fivetrex.Error.t()}- On failure
Examples
{:ok, %{success: true}} = Fivetrex.Connectors.sync(client, "connector_id")
# With full result inspection
case Fivetrex.Connectors.sync(client, connector_id) do
{:ok, %{success: true, sync_state: state}} ->
IO.puts("Sync triggered, state: #{state}")
{:error, error} ->
IO.puts("Sync failed: #{error.message}")
end
@spec update(Fivetrex.Client.t(), String.t(), map()) :: {:ok, Fivetrex.Models.Connector.t()} | {:error, Fivetrex.Error.t()}
Updates an existing connector.
Parameters
client- The Fivetrex clientconnector_id- The ID of the connector to updateparams- A map with fields to update::paused- Pause or resume the connector:sync_frequency- Sync frequency in minutes:config- Updated configuration (merged with existing)
Returns
{:ok, Connector.t()}- The updated connector{:error, Fivetrex.Error.t()}- On failure
Examples
{:ok, connector} = Fivetrex.Connectors.update(client, "connector_id", %{
paused: true,
sync_frequency: 60
})
@spec update_schema_config(Fivetrex.Client.t(), String.t(), map()) :: {:ok, Fivetrex.Models.SchemaConfig.t()} | {:error, Fivetrex.Error.t()}
Updates the schema configuration for a connector.
Use this to enable/disable schemas, tables, or columns, or to change sync modes and destination names.
Parameters
client- The Fivetrex clientconnector_id- The ID of the connectorparams- A map with configuration updates::schema_change_handling-"ALLOW_ALL","ALLOW_COLUMNS", or"BLOCK_ALL":schemas- Map of schema configurations to update
Returns
{:ok, SchemaConfig.t()}- The updated schema configuration{:error, Fivetrex.Error.t()}- On failure
Examples
Disable a specific table:
{:ok, config} = Fivetrex.Connectors.update_schema_config(client, "connector_id", %{
schemas: %{
"public" => %{
tables: %{
"sensitive_data" => %{enabled: false}
}
}
}
})Hash a column for privacy:
{:ok, config} = Fivetrex.Connectors.update_schema_config(client, "connector_id", %{
schemas: %{
"public" => %{
tables: %{
"users" => %{
columns: %{
"email" => %{hashed: true}
}
}
}
}
}
})Change schema change handling:
{:ok, config} = Fivetrex.Connectors.update_schema_config(client, "connector_id", %{
schema_change_handling: "BLOCK_ALL"
})