Blink behaviour (blink v0.4.1)

View Source

Blink provides an efficient way to seed large amounts of data into your database.

Overview

Blink simplifies database seeding by providing a structured way to build and insert records:

  1. Create an empty Store.
  2. Assign the records you want to insert to each database table.
  3. Bulk-insert the records into your database.

Stores

Stores are the central data unit in Blink. A Store is a struct that holds the records you want to seed, along with any contextual data you need during the seeding process but do not want to insert into the database.

A Store struct contains the keys tables and context:

Blink.Store{
  tables: %{
    "table_name" => [...]
  },
  context: %{
    "key" => [...]
  }
}

All keys in tables must match the name of a table in your database. Table names can be either atoms or strings.

Tables

A mapping of table names to lists of records. These records will be persisted to the database when insert/2 or insert/3 are called.

Context

Stores arbitrary data needed during the seeding process. This data is available when building your seeds but is not inserted into the database by insert/2 or insert/3.

Basic Usage

To seed your database with Blink, follow these three steps:

  • Create: Initialize an empty store with new/0.

  • Build: Add seed data with add_table/2 and context data with add_context/2.

  • Insert: Persist records to the database with insert/2 or insert/3.

Example

defmodule MyApp.Seeder do
  use Blink

  def call do
    new()
    |> add_table("users")
    |> add_context("post_ids")
    |> insert(MyApp.Repo, batch_size: 1_200)
  end

  def table(_store, "users") do
    [
      %{id: 1, name: "Alice", email: "alice@example.com"},
      %{id: 2, name: "Bob", email: "bob@example.com"}
    ]
  end

  def context(_store, "post_ids") do
    [1, 2, 3]
  end
end

Custom Logic for Inserting Records

The functions insert/2 and insert/3 bulk insert the table records in a Store into a Postgres database using Postgres' COPY command. You can override the default implementation by defining your own insert/2 or insert/3 function in your Blink module. Doing so you can support seeding databases other than Postgres.

Summary

Callbacks

Builds and returns the data to be stored under a context key in the given Store.

Specifies how to perform a bulk insert of the seed data from a Store into the given Ecto repository.

Builds and returns the records to be stored under a table key in the given Store.

Functions

Copies a list of items into a database table using database-specific bulk copy commands.

Reads a CSV file and returns a list of maps suitable for use in table/2 callbacks.

Reads a JSON file and returns a list of maps suitable for use in table/2 callbacks.

Callbacks

context(store, key)

(optional)
@callback context(store :: Blink.Store.t(), key :: Blink.Store.key()) :: [map()]

Builds and returns the data to be stored under a context key in the given Store.

The callback context/2 is called by add_context/2 internally, passing the given context key to context/2. Therefore, each key passed to a add_context/2 clause must match a context/2 clause.

insert/2 and insert/3 ignore the :context data and only insert data from :tables.

When the callback function is missing, an ArgumentError is raised.

insert(store, repo)

(optional)
@callback insert(store :: Blink.Store.t(), repo :: Ecto.Repo.t()) ::
  {:ok, any()} | {:error, any()}

Specifies how to perform a bulk insert of the seed data from a Store into the given Ecto repository.

This callback function is optional, since Blink ships with a default implementation for Postgres databases.

insert(store, repo, opts)

(optional)
@callback insert(store :: Blink.Store.t(), repo :: Ecto.Repo.t(), opts :: Keyword.t()) ::
  {:ok, any()} | {:error, any()}

table(store, table_name)

(optional)
@callback table(store :: Blink.Store.t(), table_name :: Blink.Store.key()) :: [map()]

Builds and returns the records to be stored under a table key in the given Store.

The callback table/2 is called by add_table/2 internally, passing the given database table name to table/2. Therefore, each table name passed to a add_table/2 clause must match a table/2 clause.

Data added to a store with table/2 is inserted into the corresponding database table when calling insert/2 or insert/3.

When the callback function is missing, an ArgumentError is raised.

Functions

copy_to_table(items, table_name, repo, opts \\ [])

@spec copy_to_table(
  items :: [map()],
  table_name :: Blink.Store.key(),
  repo :: Ecto.Repo.t(),
  opts :: Keyword.t()
) :: {:ok, any()} | {:error, any()}

Copies a list of items into a database table using database-specific bulk copy commands.

This function provides an efficient way to insert large amounts of data by using database-specific bulk copy commands. Items are streamed to the database in batches to minimize memory usage.

Parameters

  • items - A list of maps where each map represents a row to insert. All maps must have the same keys, which correspond to the table columns.
  • table_name - The name of the table to insert into (string or atom).
  • repo - An Ecto repository module.
  • opts - Keyword list of options:
    • :adapter - The adapter module to use. Defaults to Blink.Adapter.Postgres.
    • :batch_size - Number of rows to send per batch (default: 900)

Returns

  • {:ok, result} - When the copy operation succeeds
  • {:error, reason} - When the copy operation fails

Examples

iex> items = [%{id: 1, name: "Alice"}, %{id: 2, name: "Bob"}]
iex> copy_to_table(items, "users", MyApp.Repo, batch_size: 1000)
{:ok, _result}

# Using a specific adapter
iex> copy_to_table(items, "users", MyApp.Repo, adapter: Some.Custom.Adapter)
{:ok, _result}

Notes

The function assumes all items have the same structure. Column names are extracted from the first item in the list.

Currently only PostgreSQL is supported via Blink.Adapter.Postgres.

from_csv(path, opts \\ [])

@spec from_csv(path :: String.t(), opts :: Keyword.t()) :: [map()]

Reads a CSV file and returns a list of maps suitable for use in table/2 callbacks.

By default, the CSV file must have a header row. Each column header will become a string key in the resulting maps. All values are returned as strings.

Parameters

  • path - Path to the CSV file (relative or absolute)
  • opts - Keyword list of options:
    • :headers - List of header names to use, or :infer to read from first row (default: :infer)
    • :transform - Function to transform each row map (default: identity function)

Examples

# Simple usage with headers in first row
def table(_store, "users") do
  Blink.from_csv("users.csv")
end

# CSV without headers - provide them explicitly
def table(_store, "users") do
  Blink.from_csv("users.csv", headers: ["id", "name", "email"])
end

# With custom transformation for type conversion
def table(_store, "users") do
  Blink.from_csv("users.csv",
    transform: fn row ->
      row
      |> Map.update!("id", &String.to_integer/1)
      |> Map.update!("age", &String.to_integer/1)
    end
  )
end

Returns

A list of maps, where each map represents a row from the CSV file.

from_json(path, opts \\ [])

@spec from_json(path :: String.t(), opts :: Keyword.t()) :: [map()]

Reads a JSON file and returns a list of maps suitable for use in table/2 callbacks.

The JSON file must contain an array of objects at the root level. Each object becomes a map with string keys.

Parameters

  • path - Path to the JSON file
  • opts - Keyword list of options:
    • :transform - Function to transform each row map (default: identity function)

Examples

# Simple usage
def table(_store, "users") do
  Blink.from_json("users.json")
end

# With custom transformation for type conversion
def table(_store, "users") do
  Blink.from_json("users.json",
    transform: fn row ->
      row
      |> Map.update!("id", &String.to_integer/1)
      |> Map.update!("age", &String.to_integer/1)
    end
  )
end

Returns

A list of maps, where each map represents an object from the JSON array.