The public UDF surface covers package list/register/remove, single-record UDF execution, batch record UDF execution, background query UDF jobs, and aggregate queries. It does not claim broad UDF package tooling beyond those commands.
Lua Package Example
A UDF package is a .lua file with one or more named functions. Record UDFs
receive the target record as the first argument, followed by the arguments
passed from Elixir.
-- priv/udf/records.lua
function mark_seen(rec, source)
local visits = rec["visits"]
if visits == nil then
visits = 0
end
rec["visits"] = visits + 1
rec["last_seen_source"] = source
aerospike:update(rec)
return rec["visits"]
endAfter registering this file as records.lua, call it with package name
"records" and function name "mark_seen".
Aggregate UDFs receive a stream instead of one record. The same package can contain record UDF functions and aggregate functions, but aggregate functions must return stream operations.
-- priv/udf/user_stats.lua
local function add_bin(total, rec, bin_name)
return total + (rec[bin_name] or 0)
end
local function add_age(total, rec)
return add_bin(total, rec, "age")
end
local function merge_sum(left, right)
return left + right
end
function sum_age(stream, bin_name)
return stream : aggregate(0, function(total, rec)
return add_bin(total, rec, bin_name)
end) : reduce(merge_sum)
end
function sum_summary(stream)
return stream : aggregate(0, add_age) : reduce(merge_sum) : map(function(total)
return map{sum = total, labels = {"total", tostring(total)}}
end)
endPackage Lifecycle
Upload a Lua package from a readable path or inline source. The server filename
usually includes .lua; record and query execution use the package name
without that suffix.
server_name = "records.lua"
package = "records"
{:ok, task} =
Aerospike.register_udf(:aerospike, "priv/udf/records.lua", server_name)
:ok = Aerospike.RegisterTask.wait(task, timeout: 10_000, poll_interval: 200)
{:ok, udfs} = Aerospike.list_udfs(:aerospike)
Enum.find(udfs, &(&1.filename == server_name))Remove by server filename:
:ok = Aerospike.remove_udf(:aerospike, server_name)Single-Record UDFs
apply_udf/6 executes one function against one record key.
key = Aerospike.key("test", "users", "user:42")
{:ok, _metadata} =
Aerospike.put(:aerospike, key, %{"visits" => 1})
{:ok, result} =
Aerospike.apply_udf(:aerospike, key, package, "mark_seen", ["web"])The helper accepts write-family options such as :timeout, :socket_timeout,
:ttl, :generation, :generation_policy, :commit_level, :filter, and
:txn. Once a record UDF request is on the wire, transport failures are not
retried automatically because server-side effects may already have occurred.
Background Query UDF Jobs
query_udf/6 starts a background job and returns an Aerospike.ExecuteTask.
query =
Aerospike.Query.new("test", "users")
|> Aerospike.Query.where(Aerospike.Filter.equal("status", "inactive"))
{:ok, task} =
Aerospike.query_udf(:aerospike, query, package, "deactivate", [])
:ok = Aerospike.ExecuteTask.wait(task, timeout: 30_000, poll_interval: 500)Pass node: node_name when the background job should target one active node.
Aggregate Streams
query_aggregate/6 returns the partial values emitted by the server. The
client does not run local Lua finalization on this path.
server_name = "user_stats.lua"
package = "user_stats"
{:ok, task} =
Aerospike.register_udf(:aerospike, "priv/udf/user_stats.lua", server_name)
:ok = Aerospike.RegisterTask.wait(task, timeout: 10_000, poll_interval: 200)
query =
Aerospike.Query.new("test", "users")
|> Aerospike.Query.where(Aerospike.Filter.range("age", 18, 65))
{:ok, partials} =
Aerospike.query_aggregate(:aerospike, query, package, "sum_age", ["age"],
timeout: 10_000
)
total = partials |> Enum.to_list() |> Enum.sum()In this example, the server package was registered as "user_stats.lua", the
package argument is "user_stats", the function argument is "sum_age", and
["age"] is passed to the Lua function after the stream.
Finalized Aggregate Results
Use query_aggregate_result/6 when the caller wants one locally finalized
result. The local Lua source is required even when the package is already
registered on the server.
{:ok, total} =
Aerospike.query_aggregate_result(
:aerospike,
query,
package,
"sum_age",
["age"],
source_path: "priv/udf/user_stats.lua",
timeout: 10_000
)Pass exactly one of source: lua_source or source_path: path. Missing
source, both source options, unreadable files, unsupported local arguments, or
node: node_name return an invalid-argument error before the server query is
opened.
The registered server package and the local source are related but separate:
- Register
"user_stats.lua"on the server before callingquery_aggregate/6orquery_aggregate_result/6. - Pass package name
"user_stats"and function name"sum_age"to the query call. - Pass
source_path: "priv/udf/user_stats.lua"orsource: lua_sourceonly forquery_aggregate_result/6, because local finalization needs the Lua source available to the Elixir client. - Keep the local source aligned with the registered package. The client does not fetch Lua source back from the server or infer a path from the package name.
The local reducer runs in a bounded Lua state. Supported stream helpers are
map, filter, aggregate, and reduce; logging helpers are no-ops. Local
filesystem, OS, package loading, dynamic loading, debug access, require,
groupby, list, bytes, and record/database mutation or lookup helpers
fail explicitly.
Values crossing the local Lua boundary are limited to nil, booleans,
integers, floats, binaries, lists, and maps with scalar keys. Empty
finalization returns {:ok, nil}; multiple final values return an error.
{:ok, summary} =
Aerospike.query_aggregate_result(
:aerospike,
query,
package,
"sum_summary",
[],
source_path: "priv/udf/user_stats.lua",
timeout: 10_000
)
%{"labels" => labels, "sum" => sum} = summary