AlpacaElixir v2.2.0 Alpaca.Stream behaviour View Source

The Alpaca.Stream module handles negotiating the connection, then sending frames, receiving frames, closing, and reconnecting that connection for the websocket streaming API of Alpaca. A simple client implementation would be:

defmodule AlpacaStreamClient do
  use Alpaca.Stream, url: "https://paper-api.alpaca.markets/stream"

  def start_link() do
    start_link(["account_updates", "trade_updates"])
  end

  @impl Alpaca.Stream
  def handle_msg(msg, state) do
    IO.puts "Received a message: #{msg}"
    {:ok, state}
  end
end

The url keyword is optional and if omitted will be defaulted to "#{Client.api_host()}/stream" to key backwards compatibility.

Supervision

Alpaca.Stream uses WebSockex under the hood

WebSockex is implemented as an OTP Special Process and as a result will fit into supervision trees. WebSockex also supports the Supervisor children format introduced in Elixir 1.5. Meaning that a child specification could be {ClientModule, [state]}. However, since there is a possibility that you would like to provide a t:WebSockex.Conn/0 or a url as well as the state, there are two versions of the child_spec function. If you need functionality beyond that it is recommended that you override the function or define your own. Just remember to use the version that corresponds with your start_link's arity.

Link to this section Summary

Callbacks

Define how we want to handle the messages we receive from the websocket You can expect them to be a map since we will decode them from a binary to a json.

Link to this section Callbacks

Specs

handle_msg(msg :: map(), state :: term()) ::
  {:ok, new_state}
  | {:reply, WebSockex.frame(), new_state}
  | {:close, new_state}
  | {:close, WebSockex.close_frame(), new_state}
when new_state: term()

Define how we want to handle the messages we receive from the websocket You can expect them to be a map since we will decode them from a binary to a json.

Example

defmodule TestStream do
  use Alpaca.Stream, url: "wss://data.alpaca.markets/stream"

  @impl Alpaca.Stream
  def handle_msg(msg, state) do
    IO.puts "Received a message: #{inspect(msg)}"
    {:ok, state}
  end
end