Vllm.AsyncLLMEngine (VLLM v0.3.0)

Copy Markdown View Source

Protocol class for Clients to Engine

Summary

Functions

vLLM: a high-throughput and memory-efficient inference engine for LLMs

Background loop: pulls from EngineCore and pushes to AsyncStreams.

Abort RequestId in OutputProcessor and EngineCore.

Load a new LoRA adapter into the engine for future requests.

Raise if unhealthy

Perform a collective RPC call to the given path.

vLLM: a high-throughput and memory-efficient inference engine for LLMs

Main function called by the API server to kick off a request

Create an AsyncLLM from the EngineArgs.

vLLM: a high-throughput and memory-efficient inference engine for LLMs

Main function called by the API server to kick off a request

Get the tokenizer

Return whether the engine is currently paused.

Check whether the engine is sleeping

vLLM: a high-throughput and memory-efficient inference engine for LLMs

List all registered adapters.

Pause generation to allow model weight updates.

Prevent an adapter from being evicted.

Remove an already loaded LoRA adapter.

Reset the multi-modal cache

Reset the prefix cache and optionally any configured connector cache

Resume generation after :meth:pause_generation.

Scale up or down the data parallel size by adding or removing

Shutdown, cleaning up the background proc and IPC.

Sleep the engine

Start profiling the engine

Stop profiling the engine

Wait for all requests to be drained.

Wake up the engine

Types

t()

@opaque t()

Functions

_add_request(ref, request, prompt, parent_req, index, queue, opts \\ [])

@spec _add_request(
  SnakeBridge.Ref.t(),
  term(),
  term(),
  term(),
  integer(),
  term(),
  keyword()
) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

vLLM: a high-throughput and memory-efficient inference engine for LLMs

Parameters

  • request (term())
  • prompt (term())
  • parent_req (term())
  • index (integer())
  • queue (term())

Returns

  • term()

_run_output_handler(ref, opts \\ [])

@spec _run_output_handler(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Background loop: pulls from EngineCore and pushes to AsyncStreams.

Returns

  • term()

abort(ref, request_id, args, opts \\ [])

@spec abort(SnakeBridge.Ref.t(), term(), [term()], keyword()) ::
  {:ok, nil} | {:error, Snakepit.Error.t()}

Abort RequestId in OutputProcessor and EngineCore.

Parameters

  • request_id (term())
  • internal (boolean() default: False)

Returns

  • nil

add_lora(ref, lora_request, opts \\ [])

@spec add_lora(SnakeBridge.Ref.t(), term(), keyword()) ::
  {:ok, boolean()} | {:error, Snakepit.Error.t()}

Load a new LoRA adapter into the engine for future requests.

Parameters

  • lora_request (term())

Returns

  • boolean()

add_request(ref, request_id, prompt, params, args, opts \\ [])

@spec add_request(
  SnakeBridge.Ref.t(),
  String.t(),
  term(),
  term(),
  [term()],
  keyword()
) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Add new request to the AsyncLLM.

Parameters

  • request_id (String.t())
  • prompt (term())
  • params (term())
  • arrival_time (term() default: None)
  • lora_request (term() default: None)
  • tokenization_kwargs (term() default: None)
  • trace_headers (term() default: None)
  • priority (integer() default: 0)
  • data_parallel_rank (term() default: None)
  • prompt_text (term() default: None)

Returns

  • term()

check_health(ref, opts \\ [])

@spec check_health(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, nil} | {:error, Snakepit.Error.t()}

Raise if unhealthy

Returns

  • nil

collective_rpc(ref, method, args, opts \\ [])

@spec collective_rpc(SnakeBridge.Ref.t(), String.t(), [term()], keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Perform a collective RPC call to the given path.

Parameters

  • method (String.t())
  • timeout (term() default: None)
  • args (tuple() default: ())
  • kwargs (term() default: None)

Returns

  • term()

dead_error(ref)

@spec dead_error(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

do_log_stats(ref, opts \\ [])

@spec do_log_stats(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, nil} | {:error, Snakepit.Error.t()}

vLLM: a high-throughput and memory-efficient inference engine for LLMs

Returns

  • nil

encode(ref, prompt, pooling_params, request_id, args, opts \\ [])

@spec encode(
  SnakeBridge.Ref.t(),
  term(),
  Vllm.PoolingParamsClass.t(),
  String.t(),
  [term()],
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Main function called by the API server to kick off a request

    1. Making an AsyncStream corresponding to the Request.
      1. Processing the Input.
      1. Adding the Request to the EngineCore (separate process).

A separate output_handler loop runs in a background AsyncIO task, pulling outputs from EngineCore and putting them into the per-request AsyncStream.

The caller of generate() iterates the returned AsyncGenerator, returning the RequestOutput back to the caller.

NOTE: truncate_prompt_tokens is deprecated in v0.14. TODO: Remove truncate_prompt_tokens in v0.15.

Parameters

  • prompt (term())
  • pooling_params (Vllm.PoolingParamsClass.t())
  • request_id (String.t())
  • lora_request (term() default: None)
  • trace_headers (term() default: None)
  • priority (integer() default: 0)
  • truncate_prompt_tokens (term() default: None)
  • tokenization_kwargs (term() default: None)

Returns

  • term()

errored(ref)

@spec errored(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

from_engine_args(ref, engine_args, args, opts \\ [])

@spec from_engine_args(SnakeBridge.Ref.t(), term(), [term()], keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Create an AsyncLLM from the EngineArgs.

Parameters

  • engine_args (term())
  • start_engine_loop (boolean() default: True)
  • usage_context (term() default: <UsageContext.ENGINE_CONTEXT: 'ENGINE_CONTEXT'>)
  • stat_loggers (term() default: None)

Returns

  • term()

from_vllm_config(ref, vllm_config, args, opts \\ [])

@spec from_vllm_config(SnakeBridge.Ref.t(), term(), [term()], keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

vLLM: a high-throughput and memory-efficient inference engine for LLMs

Parameters

  • vllm_config (term())
  • start_engine_loop (boolean() default: True)
  • usage_context (term() default: <UsageContext.ENGINE_CONTEXT: 'ENGINE_CONTEXT'>)
  • stat_loggers (term() default: None)
  • enable_log_requests (boolean() default: False)
  • aggregate_engine_logging (boolean() default: False)
  • disable_log_stats (boolean() default: False)
  • client_addresses (term() default: None)
  • client_count (integer() default: 1)
  • client_index (integer() default: 0)

Returns

  • term()

generate(ref, prompt, sampling_params, request_id, opts \\ [])

@spec generate(
  SnakeBridge.Ref.t(),
  term(),
  Vllm.SamplingParamsClass.t(),
  String.t(),
  keyword()
) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Main function called by the API server to kick off a request

    1. Making an AsyncStream corresponding to the Request.
      1. Processing the Input.
      1. Adding the Request to the Detokenizer.
      1. Adding the Request to the EngineCore (separate process).

A separate output_handler loop runs in a background AsyncIO task, pulling outputs from EngineCore and putting them into the per-request AsyncStream.

The caller of generate() iterates the returned AsyncGenerator, returning the RequestOutput back to the caller.

Parameters

  • prompt (term())
  • sampling_params (Vllm.SamplingParamsClass.t())
  • request_id (String.t())
  • prompt_text (term() keyword-only default: None)
  • lora_request (term() keyword-only default: None)
  • tokenization_kwargs (term() keyword-only default: None)
  • trace_headers (term() keyword-only default: None)
  • priority (integer() keyword-only default: 0)
  • data_parallel_rank (term() keyword-only default: None)

Returns

  • term()

get_supported_tasks(ref, opts \\ [])

@spec get_supported_tasks(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, {term(), term()}} | {:error, Snakepit.Error.t()}

Get supported tasks

Returns

  • {term(), term()}

get_tokenizer(ref, opts \\ [])

@spec get_tokenizer(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Get the tokenizer

Returns

  • term()

is_paused(ref, opts \\ [])

@spec is_paused(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, boolean()} | {:error, Snakepit.Error.t()}

Return whether the engine is currently paused.

Returns

  • boolean()

is_running(ref)

@spec is_running(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

is_sleeping(ref, opts \\ [])

@spec is_sleeping(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, boolean()} | {:error, Snakepit.Error.t()}

Check whether the engine is sleeping

Returns

  • boolean()

is_stopped(ref)

@spec is_stopped(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

is_tracing_enabled(ref, opts \\ [])

@spec is_tracing_enabled(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, boolean()} | {:error, Snakepit.Error.t()}

vLLM: a high-throughput and memory-efficient inference engine for LLMs

Returns

  • boolean()

list_loras(ref, opts \\ [])

@spec list_loras(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, MapSet.t(integer())} | {:error, Snakepit.Error.t()}

List all registered adapters.

Returns

  • MapSet.t(integer())

new(vllm_config, executor_class, log_stats, args, opts \\ [])

@spec new(term(), term(), boolean(), [term()], keyword()) ::
  {:ok, SnakeBridge.Ref.t()} | {:error, Snakepit.Error.t()}

Create an AsyncLLM.

Parameters

  • vllm_config - global configuration.
  • executor_class - an Executor impl, e.g. MultiprocExecutor.
  • log_stats - Whether to log stats.
  • usage_context - Usage context of the LLM.
  • mm_registry - Multi-modal registry.
  • use_cached_outputs - Whether to use cached outputs.
  • log_requests - Whether to log requests.
  • start_engine_loop - Whether to start the engine loop.
  • stat_loggers - customized stat loggers for the engine. If not provided, default stat loggers will be used. PLEASE BE AWARE THAT STAT LOGGER IS NOT STABLE IN V1, AND ITS BASE CLASS INTERFACE MIGHT CHANGE.

pause_generation(ref, opts \\ [])

@spec pause_generation(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, nil} | {:error, Snakepit.Error.t()}

Pause generation to allow model weight updates.

New generation/encoding requests are blocked until resume.

Parameters

  • wait_for_inflight_requests - When True waits for in-flight requests to finish before pausing. When False (default), immediately aborts any in-flight requests.
  • clear_cache - Whether to clear KV cache and prefix cache after draining. Set to False to preserve cache for faster resume. Default is True (clear caches).

Returns

  • nil

pin_lora(ref, lora_id, opts \\ [])

@spec pin_lora(SnakeBridge.Ref.t(), integer(), keyword()) ::
  {:ok, boolean()} | {:error, Snakepit.Error.t()}

Prevent an adapter from being evicted.

Parameters

  • lora_id (integer())

Returns

  • boolean()

remove_lora(ref, lora_id, opts \\ [])

@spec remove_lora(SnakeBridge.Ref.t(), integer(), keyword()) ::
  {:ok, boolean()} | {:error, Snakepit.Error.t()}

Remove an already loaded LoRA adapter.

Parameters

  • lora_id (integer())

Returns

  • boolean()

reset_mm_cache(ref, opts \\ [])

@spec reset_mm_cache(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, nil} | {:error, Snakepit.Error.t()}

Reset the multi-modal cache

Returns

  • nil

reset_prefix_cache(ref, args, opts \\ [])

@spec reset_prefix_cache(SnakeBridge.Ref.t(), [term()], keyword()) ::
  {:ok, boolean()} | {:error, Snakepit.Error.t()}

Reset the prefix cache and optionally any configured connector cache

Parameters

  • reset_running_requests (boolean() default: False)
  • reset_connector (boolean() default: False)

Returns

  • boolean()

resume_generation(ref, opts \\ [])

@spec resume_generation(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, nil} | {:error, Snakepit.Error.t()}

Resume generation after :meth:pause_generation.

Returns

  • nil

scale_elastic_ep(ref, new_data_parallel_size, args, opts \\ [])

@spec scale_elastic_ep(SnakeBridge.Ref.t(), integer(), [term()], keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Scale up or down the data parallel size by adding or removing

engine cores.

Parameters

  • new_data_parallel_size - The new number of data parallel workers
  • drain_timeout - Maximum time to wait for requests to drain (seconds)

Returns

  • term()

shutdown(ref, opts \\ [])

@spec shutdown(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Shutdown, cleaning up the background proc and IPC.

Returns

  • term()

sleep(ref, args, opts \\ [])

@spec sleep(SnakeBridge.Ref.t(), [term()], keyword()) ::
  {:ok, nil} | {:error, Snakepit.Error.t()}

Sleep the engine

Parameters

  • level (integer() default: 1)

Returns

  • nil

start_profile(ref, opts \\ [])

@spec start_profile(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, nil} | {:error, Snakepit.Error.t()}

Start profiling the engine

Returns

  • nil

stop_profile(ref, opts \\ [])

@spec stop_profile(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, nil} | {:error, Snakepit.Error.t()}

Stop profiling the engine

Returns

  • nil

tokenizer(ref)

@spec tokenizer(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

wait_for_requests_to_drain(ref, args, opts \\ [])

@spec wait_for_requests_to_drain(SnakeBridge.Ref.t(), [term()], keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Wait for all requests to be drained.

Parameters

  • drain_timeout (integer() default: 300)

Returns

  • term()

wake_up(ref, args, opts \\ [])

@spec wake_up(SnakeBridge.Ref.t(), [term()], keyword()) ::
  {:ok, nil} | {:error, Snakepit.Error.t()}

Wake up the engine

Parameters

  • tags (term() default: None)

Returns

  • nil