UDFs And Aggregates

Copy Markdown View Source

The public UDF surface covers package list/register/remove, single-record UDF execution, batch record UDF execution, background query UDF jobs, and aggregate queries. It does not claim broad UDF package tooling beyond those commands.

Lua Package Example

A UDF package is a .lua file with one or more named functions. Record UDFs receive the target record as the first argument, followed by the arguments passed from Elixir.

-- priv/udf/records.lua

function mark_seen(rec, source)
  local visits = rec["visits"]

  if visits == nil then
    visits = 0
  end

  rec["visits"] = visits + 1
  rec["last_seen_source"] = source

  aerospike:update(rec)

  return rec["visits"]
end

After registering this file as records.lua, call it with package name "records" and function name "mark_seen".

Aggregate UDFs receive a stream instead of one record. The same package can contain record UDF functions and aggregate functions, but aggregate functions must return stream operations.

-- priv/udf/user_stats.lua

local function add_bin(total, rec, bin_name)
  return total + (rec[bin_name] or 0)
end

local function add_age(total, rec)
  return add_bin(total, rec, "age")
end

local function merge_sum(left, right)
  return left + right
end

function sum_age(stream, bin_name)
  return stream : aggregate(0, function(total, rec)
    return add_bin(total, rec, bin_name)
  end) : reduce(merge_sum)
end

function sum_summary(stream)
  return stream : aggregate(0, add_age) : reduce(merge_sum) : map(function(total)
    return map{sum = total, labels = {"total", tostring(total)}}
  end)
end

Package Lifecycle

Upload a Lua package from a readable path or inline source. The server filename usually includes .lua; record and query execution use the package name without that suffix.

server_name = "records.lua"
package = "records"

{:ok, task} =
  Aerospike.register_udf(:aerospike, "priv/udf/records.lua", server_name)

:ok = Aerospike.RegisterTask.wait(task, timeout: 10_000, poll_interval: 200)
{:ok, udfs} = Aerospike.list_udfs(:aerospike)

Enum.find(udfs, &(&1.filename == server_name))

Remove by server filename:

:ok = Aerospike.remove_udf(:aerospike, server_name)

Single-Record UDFs

apply_udf/6 executes one function against one record key.

key = Aerospike.key("test", "users", "user:42")

{:ok, _metadata} =
  Aerospike.put(:aerospike, key, %{"visits" => 1})

{:ok, result} =
  Aerospike.apply_udf(:aerospike, key, package, "mark_seen", ["web"])

The helper accepts write-family options such as :timeout, :socket_timeout, :ttl, :generation, :generation_policy, :commit_level, :filter, and :txn. Once a record UDF request is on the wire, transport failures are not retried automatically because server-side effects may already have occurred.

Background Query UDF Jobs

query_udf/6 starts a background job and returns an Aerospike.ExecuteTask.

query =
  Aerospike.Query.new("test", "users")
  |> Aerospike.Query.where(Aerospike.Filter.equal("status", "inactive"))

{:ok, task} =
  Aerospike.query_udf(:aerospike, query, package, "deactivate", [])

:ok = Aerospike.ExecuteTask.wait(task, timeout: 30_000, poll_interval: 500)

Pass node: node_name when the background job should target one active node.

Aggregate Streams

query_aggregate/6 returns the partial values emitted by the server. The client does not run local Lua finalization on this path.

server_name = "user_stats.lua"
package = "user_stats"

{:ok, task} =
  Aerospike.register_udf(:aerospike, "priv/udf/user_stats.lua", server_name)

:ok = Aerospike.RegisterTask.wait(task, timeout: 10_000, poll_interval: 200)

query =
  Aerospike.Query.new("test", "users")
  |> Aerospike.Query.where(Aerospike.Filter.range("age", 18, 65))

{:ok, partials} =
  Aerospike.query_aggregate(:aerospike, query, package, "sum_age", ["age"],
    timeout: 10_000
  )

total = partials |> Enum.to_list() |> Enum.sum()

In this example, the server package was registered as "user_stats.lua", the package argument is "user_stats", the function argument is "sum_age", and ["age"] is passed to the Lua function after the stream.

Finalized Aggregate Results

Use query_aggregate_result/6 when the caller wants one locally finalized result. The local Lua source is required even when the package is already registered on the server.

{:ok, total} =
  Aerospike.query_aggregate_result(
    :aerospike,
    query,
    package,
    "sum_age",
    ["age"],
    source_path: "priv/udf/user_stats.lua",
    timeout: 10_000
  )

Pass exactly one of source: lua_source or source_path: path. Missing source, both source options, unreadable files, unsupported local arguments, or node: node_name return an invalid-argument error before the server query is opened.

The registered server package and the local source are related but separate:

  • Register "user_stats.lua" on the server before calling query_aggregate/6 or query_aggregate_result/6.
  • Pass package name "user_stats" and function name "sum_age" to the query call.
  • Pass source_path: "priv/udf/user_stats.lua" or source: lua_source only for query_aggregate_result/6, because local finalization needs the Lua source available to the Elixir client.
  • Keep the local source aligned with the registered package. The client does not fetch Lua source back from the server or infer a path from the package name.

The local reducer runs in a bounded Lua state. Supported stream helpers are map, filter, aggregate, and reduce; logging helpers are no-ops. Local filesystem, OS, package loading, dynamic loading, debug access, require, groupby, list, bytes, and record/database mutation or lookup helpers fail explicitly.

Values crossing the local Lua boundary are limited to nil, booleans, integers, floats, binaries, lists, and maps with scalar keys. Empty finalization returns {:ok, nil}; multiple final values return an error.

{:ok, summary} =
  Aerospike.query_aggregate_result(
    :aerospike,
    query,
    package,
    "sum_summary",
    [],
    source_path: "priv/udf/user_stats.lua",
    timeout: 10_000
  )

%{"labels" => labels, "sum" => sum} = summary