Snowflex (Snowflex v1.1.0)
View SourceSnowflex is an Ecto adapter for Snowflake using Snowflake's SQL API.
This adapter implements the following Ecto behaviours:
Ecto.Adapter- Core adapter functionalityEcto.Adapter.Queryable- Query execution and streamingEcto.Adapter.Schema- Schema operations (insert, update, delete)
Requirements
If using the provided Snowflex.Transport.Http transport, the only currently supported authentication method is keypair.
In order to obtain the public_key_fingerprint, please follow Snowflake's instructions.
Configuration
Configure the adapter in your application:
The adapter supports configurable transport implementations through the :transport option.
By default, it uses Snowflex.Transport.Http for REST API communication with Snowflake.
config :my_app, MyApp.Repo,
adapter: Snowflex,
transport: Snowflex.Transport.Http, # Optional, defaults to Http
# Additional options passed to the transport
account_name: "your-account",
username: "your_username",
private_key_path: "path/to/key.pem",
public_key_fingerprint: "your_fingerprint"You may supply other transports that conform to the Snowflex.Transport behaviour.
For additional configuration options of the provided Snowflex.Transport.Http transport, see it's documentation.
Query Tagging
All queries can be tagged for better observability and tracking in Snowflake. Tags are passed as options to any Repo function call:
# Tag a query with a UUID
MyRepo.all(query, query_tag: Ecto.UUID.generate())
# Tag a query with a custom identifier
iex> MyRepo.insert(changeset, query_tag: "user_registration_abc")Query tags are visible in Snowflake's query history and can be used for:
- Tracking query origins
- Monitoring specific operations
- Debugging performance issues
- Auditing database access
Error Logging and Metadata
Snowflex automatically enriches error logs with contextual metadata to help with debugging and monitoring. When queries fail, detailed information about the error context is added to Logger metadata.
Available Metadata Keys
The following metadata keys are set when errors occur:
:snowflex_account_name- Your Snowflake account identifier:snowflex_username- The username used for the connection:snowflex_query_id- Snowflake's unique query identifier:snowflex_statement- The SQL statement that failed:snowflex_warehouse- The default warehouse used for the query (if configured):snowflex_role- The default role used for the query (if configured):snowflex_database- The default database used for the query (if configured):snowflex_schema- The default schema used for the query (if configured)
Configuring Logger
To capture this metadata in your logs, see Logger's Metadata Section for more information on how to configure Logger.
Example Error Log
With metadata configured, error logs will include context like:
snowflex_account_name=my_account snowflex_username=my_user snowflex_warehouse=MY_WH snowflex_database=MY_DB snowflex_statement=SELECT * FROM users [error] Snowflake query failed: [529] "Server too busy. Please retry."This enriched logging makes it easier to:
- Track which account/warehouse/role errors are occurring in
- Correlate errors with specific SQL statements
- Debug issues across different Snowflake environments
- Monitor query failures in production
Type Support
The adapter supports the following type conversions:
From Snowflake to Ecto
:integer- Integer values:decimal- Decimal values:float- Float values:date- Date values:time- Time values:utc_datetime- UTC datetime values:naive_datetime- Naive datetime values:binary- Binary data (as hex strings):map- JSON/VARIANT data
From Ecto to Snowflake
:binary- Binary data (as hex strings):decimal- Decimal values:float- Float values:date- Date values:time- Time values:utc_datetime- UTC datetime values:naive_datetime- Naive datetime values:map- JSON/VARIANT data
Limitations/Considerations
Transactions
Snowflex does not support multi-statement transactions. The reason for this is the Snowflake SQL API does not support multi-request transactions. That is to say, all statements in a transaction must be sent in the same request. Because it is a common pattern to rely on the results of a previous statement in further downstream queries in the same transaction (e.g. Ecto.Multi), this limitation in the SQL API meant that we either needed to provide a potentially unintuitive use case, or just not support them at all.
Multiple Statements
Snowflex supports submitting multiple statements in the same query and will return the results of each statement packed into an array.
This can be useful when statements you want to execute need to occur inside of the same transaction (e.g. you need to leverage a temporary table)
This is possible using both the query and query_many functions.
iex> Repo.query("SELECT 1; SELECT 2;")
{:ok,
[
%Snowflex.Result{rows: [[1]]},
%Snowflex.Result{rows: [[2]]}
]
}Streaming
When streaming rows using Snowflex.Transport.Http, keep in mind that Snowflake dictates the number of partitions returned. This is different than a normal TCP protocol like Postgrex, where the stream will be iterating on one row at a time.
Internally we utilize the same Stream modules as other implementations, but because each traunch of results is being determined externally to your app, that memory usage will be higher than if we were bringing back one row at a time.
Migrations
Migrations are not currently supported by Snowflex.
Migration from ODBC
Previous versions of Snowflex relied on Erlang :odbc. While very stable and battle tested, has always suffered from the idiosyncrasies inherent in ODBC, as well as limitations on the Snowflake side.
This V1.0 implementation removes support for ODBC, and instead relies solely on Snowflake's SQL API.
If you want to progressively migrate to the Ecto implementation, these tips might be helpful:
- Remove all
Snowflex.*_param()wrapped functions. - In your Snowflake Repo, add a declaration similar to the following:
defmodule MyApp.Snowflake do
use Ecto.Repo, otp_app: :my_app, adapter: Snowflex
def sql_query(query) do
execute(query)
end
def param_query(query, params \\ %{}) do
execute(query, params)
end
defp execute(query, params \\ %{}) do
query
|> query_many!(params) # provided by `Ecto.Repo`
|> process_results()
|> unwrap_single_result()
end
defp process_results([]), do: []
defp process_results([%Snowflex.Result{} = result | rest]),
do: [unpack_snowflex_result(result) | process_results(rest)]
defp process_results([other | rest]), do: [other | process_results(rest)]
defp unpack_snowflex_result(%{columns: nil, num_rows: num_rows}), do: {:updated, num_rows}
defp unpack_snowflex_result(%{columns: columns, rows: rows})
when is_list(columns) and is_list(rows) do
headers = Enum.map(columns, &(to_string(&1) |> String.downcase()))
rows
|> Enum.map(fn row ->
Enum.zip(headers, row) |> Map.new()
end)
end
# If there's just one result, unwrap it
defp unwrap_single_result([result]), do: result
defp unwrap_single_result(results), do: results
endAny references to Snowflex for sql_query and param_query can then be replaced with MyApp.Snowflake.
To replace the previous functionality of cast_results, we would recommend that you leverage a schemaless changeset.
Testing in your Project
When running tests locally, it can often be helpful to avoid hitting Snowflake to avoid unnecessary compute/storage costs.
See Snowflex.MigrationGenerator for a strategy to use a local DB implementation when running unit tests, while still using Snowflake in dev/prod environments.