Tinkex.SamplingClient (Tinkex v0.3.4)

View Source

Sampling client that performs lock-free reads via ETS.

Init runs in a GenServer to create the sampling session and register state in Tinkex.SamplingRegistry. Once initialized, sample/4 reads configuration directly from ETS without touching the GenServer, avoiding bottlenecks under high load.

For plain-text prompts, build a Tinkex.Types.ModelInput via Tinkex.Types.ModelInput.from_text/2 with the target model name. Chat templates are not applied automatically.

Queue State Observer

This client implements Tinkex.QueueStateObserver and automatically logs human-readable warnings when queue state changes indicate rate limiting or capacity issues:

[warning] Sampling is paused for session-123. Reason: concurrent sampler weights limit hit

Logs are debounced to once per 60 seconds per session to avoid spam.

Summary

Functions

Returns a specification to start this module under a supervisor.

Clear debounce state for a sampling session to avoid unbounded growth.

Convenience helper to compute prompt token log probabilities.

Create a sampling client asynchronously.

Stream a sampling request, yielding tokens incrementally via SSE.

Types

t()

@type t() :: pid()

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

clear_queue_state_debounce(session_id)

@spec clear_queue_state_debounce(String.t()) :: :ok

Clear debounce state for a sampling session to avoid unbounded growth.

compute_logprobs(client, prompt, opts \\ [])

@spec compute_logprobs(t(), map(), keyword()) ::
  {:ok, Task.t()} | {:error, Tinkex.Error.t()}

Convenience helper to compute prompt token log probabilities.

Returns a Task that yields {:ok, [float() | nil]} or {:error, %Tinkex.Error{}}.

create_async(service_client, opts \\ [])

@spec create_async(
  pid(),
  keyword()
) :: Task.t()

Create a sampling client asynchronously.

This is a convenience function that delegates to ServiceClient.create_sampling_client_async/2.

Examples

task = SamplingClient.create_async(service_pid, base_model: "meta-llama/Llama-3.2-1B")
{:ok, sampling_pid} = Task.await(task)

get_telemetry(client)

sample(client, prompt, sampling_params, opts \\ [])

@spec sample(t(), map(), map(), keyword()) ::
  {:ok, Task.t()} | {:error, Tinkex.Error.t()}

Submit a sampling request.

Returns a Task.t() that yields {:ok, %SampleResponse{}} or {:error, %Tinkex.Error{}}.

sample_stream(client, prompt, sampling_params, opts \\ [])

@spec sample_stream(t(), map(), map(), keyword()) ::
  {:ok, Enumerable.t()} | {:error, Tinkex.Error.t()}

Stream a sampling request, yielding tokens incrementally via SSE.

Returns {:ok, stream} where stream is an Enumerable.t() of Tinkex.Types.SampleStreamChunk structs, or {:error, %Tinkex.Error{}}.

Examples

{:ok, stream} = SamplingClient.sample_stream(client, prompt, params)
Enum.each(stream, fn chunk ->
  IO.write(chunk.token)
end)

start_link(opts \\ [])

@spec start_link(keyword()) :: GenServer.on_start()