KNXnetIP v0.2.0 KNXnetIP.Tunnel behaviour View Source

A behaviour module for implementing KNXnet/IP tunnel clients.

The behaviour wraps Connection and attempts to ensure that the client is always connected to the KNX network.

The callback module is invoked when telegrams are received from the tunnel server, and when the connection status changes.

To send a telegram to the tunnel server, the callback module must return a :send_telegram tuple when a callback is invoked. The behaviour will then wrap the encoded telegram in a TUNNELLING_REQUEST and send it to the tunnel server. When the tunnel server responds with a TUNNELLING_ACK, the callback on_telegram_ack/1 is invoked.

The callback module must wait for on_telegram_ack/1 to be invoked before sending any further telegrams. If the callback module tries to send a new telegram while the client is waiting to receive a TUNNELLING_ACK from a previous TUNNELLING_REQUEST, the new telegram will be discarded.

The telegram is also discarded if the callback module tries to send a telegram while the client is not connected.

The behaviour deals with telegrams as raw binaries. KNXnetIP.Telegram defines a data structure to represent telegrams, as well as functions for encoding and decoding telegrams.

Example

This example will bind to the interface given by my_ip and connect to the server at server_ip.

The process registers itself with the name KnxApp.Tunnel, and other processes can send group writes to the KNX bus by using the send_group_write function. In addition, the tunnel process will forward any group writes it receives to the process that started the tunnel.

Note the map called group_addresses. This maps a certain group address to a datapoint type, and is used when encoding and decoding datapoints. This is necessary to determine the correct codec for the value in a given telegram, as the datapoint type is not present in the telegram.

defmodule KnxApp.Tunnel do
  require Logger
  alias KNXnetIP.{Datapoint, Telegram, Tunnel}
  @behaviour Tunnel

  def start_link(my_ip, server_ip) do
    knxnet_ip_opts = [
      ip: my_ip,
      server_ip: server_ip
    ]

    # 5.001 is DPT_Scaling, an 8-bit unsigned integer with unit and resolution of 0.4 %.
    # 14.056 is DPT_Value_Power, a 32-bit float with unit and resolution of 1 W.
    group_addresses = %{
      "2/0/2" => "5.001",
      "2/0/3" => "5.001",
      "2/0/4" => "5.001",
      "4/4/52" => "14.056",
      "4/4/56" => "14.056",
      "4/4/60" => "14.056"
    }

    opts = [parent: self(), group_addresses: group_addresses]

    Tunnel.start_link(__MODULE__, opts, knxnet_ip_opts, name: __MODULE__)
  end

  @doc "Encodes `value` according to the DPT of the `group_address`, and sends it in a `GroupValueWrite` to `group_address`"
  @spec send_group_write(binary(), term()) :: :ok | {:error, :unknown_group_address}
  def send_group_write(group_address, value) do
    Tunnel.call(__MODULE__, {:group_write, group_address, value})
  end

  @doc "Sends a `GroupValueRead` to `group_address`"
  @spec send_group_read(binary()) :: :ok | {:error, :unknown_group_address}
  def send_group_read(group_address) do
    Tunnel.call(__MODULE__, {:group_read, group_address})
  end

  @impl true
  def init(opts) do
    state = %{
      parent: Keyword.fetch!(opts, :parent),
      group_addresses: Keyword.fetch!(opts, :group_addresses)
    }

    {:ok, state}
  end

  @impl true
  def handle_call({:group_write, group_address, value}, _from, state) do
    case state.group_addresses[group_address] do
      nil ->
        {:reply, {:error, :unknown_group_address}, state}

      datapoint_type ->
        {:ok, value} = Datapoint.encode(value, datapoint_type)

        {:ok, telegram} =
          Telegram.encode(%Telegram{
            source: "0.0.0",
            destination: group_address,
            service: :group_write,
            type: :request,
            value: value
          })

        {:send_telegram, telegram, :ok, state}
    end
  end

  def handle_call({:group_read, group_address}, _from, state) do
    case state.group_addresses[group_address] do
      nil ->
        {:reply, {:error, :unknown_group_address}, state}

      _ ->
        {:ok, telegram} =
          Telegram.encode(%Telegram{
            source: "0.0.0",
            destination: group_address,
            service: :group_read,
            type: :request,
            value: <<0::6>>
          })

        {:send_telegram, telegram, :ok, state}
    end
  end

  @impl true
  def code_change(_vsn, state, _extra), do: {:ok, state}

  @impl true
  def terminate(_reason, state), do: state

  @impl true
  def on_connect(state), do: {:ok, state}

  @impl true
  def on_disconnect(reason, state) do
    case reason do
      :disconnect_requested ->
        {:backoff, 0, state}

      {:tunnelling_ack_error, _} ->
        {:backoff, 0, state}

      {:connectionstate_response_error, _} ->
        {:backoff, 0, state}

      {:connect_response_error, _} ->
        {:backoff, 5_000, state}
    end
  end

  @impl true
  def on_telegram_ack(state), do: {:ok, state}

  @impl true
  def on_telegram(telegram, state) do
    {:ok, telegram} = Telegram.decode(telegram)
    handle_telegram(telegram, state)
    {:ok, state}
  end

  defp handle_telegram(%Telegram{service: service} = telegram, state)
      when service in [:group_write, :group_response] do
    case state.group_addresses[telegram.destination] do
      nil ->
        Logger.info(fn -> "Ignoring unspecified group address: #{telegram.destination}" end)

      datapoint_type ->
        {:ok, value} = Datapoint.decode(telegram.value, datapoint_type)
        send(state.parent, {service, telegram.destination, value})
    end
  end

  # Ignore telegrams which are not group writes or group responses
  defp handle_telegram(_telegram, _state), do: :ok
end

Link to this section Summary

Types

Tunnel options that can be passed to the start_link/4 function

Functions

Sends a synchronous call to the Tunnel process and waits for a reply

Sends a synchronous request to the Tunnel process and waits for a reply

Sends a asynchronous request to the Tunnel process

Sends a reply to a request sent by call/3

Starts a tunnel client linked to the calling process

Callbacks

This callback is the same as Connection.code_change/3 and is used to change the state when loading a different version of the callback module

Called when the process receives a call message sent by call/3. This callback has the same arguments as the GenServer equivalent and the :reply, :noreply and :stop return tuples behave the same. However there are two additional return values

Called when the process receives a cast message sent by cast/3. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same. However there is one additional return value

Called when the process receives a message that is not a call or cast. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same. However there are is one additional return value

Invoked when the tunnel process is started. start_link/4 will block until it returns

Called when the process successfully establishes a connection to the tunnel server

Called when the process fails to establish a connection to the tunnel server, or if the process disconnects due to a protocol error

Called when the process receives a new TUNNELLING_REQUEST

Called when the process receives a TUNNELLING_ACK which matches the last sent TUNNELLING_REQUEST

This callback is the same as Connection.terminate/2 and is called when the process terminates. The first argument is the reason the process is about to exit with

Link to this section Types

Link to this type disconnect_reason() View Source
disconnect_reason() ::
  :disconnect_requested
  | {:tunnelling_ack_error, error()}
  | {:connectionstate_response_error, error()}
  | {:connect_response_error, error()}
Link to this type error() View Source
error() ::
  :e_host_protocol_type
  | :e_version_not_supported
  | :e_sequence_number
  | :e_connection_id
  | :e_connection_type
  | :e_connection_option
  | :e_no_more_connections
  | :e_data_connection
  | :e_knx_connection
  | :timeout
Link to this type option() View Source
option() ::
  {:ip, :inet.socket_address()}
  | {:control_port, :inet.port_number()}
  | {:data_port, :inet.port_number()}
  | {:server_ip, :inet.socket_address() | :inet.hostname()}
  | {:server_control_port, :inet.port_number()}
  | {:heartbeat_timeout, integer()}
  | {:connect_response_timeout, integer()}
  | {:connectionstate_response_timeout, integer()}
  | {:disconnect_response_timeout, integer()}
  | {:tunnelling_ack_timeout, integer()}

Tunnel options that can be passed to the start_link/4 function.

Timeout values have default values according to the specification. You should only override these if you know what you are doing.

  • ip: IP that the tunnel client should bind to and advertise to the tunnel server, e.g. {192, 168, 1, 10}. Default: {127, 0, 0, 1}.
  • control_port: Port that the tunnel client should bind to for control communication. Set to zero to use a random, unused port. Default: 0.
  • data_port: Port that the tunnel client should bind to for data communication. Set to zero to use a random, unused port. Default: 0.
  • server_ip: IP or hostname of tunnel server to connect to, e.g {192, 168, 1, 10}. Default: {127, 0, 0, 1}.
  • server_control_port: Control port of tunnel server. Default: 3671.
  • heartbeat_timeout: Number of milliseconds to wait before sending a CONNECTIONSTATE_REQUEST. Default: 60_000.
  • connect_response_timeout: Number of milliseconds to wait for a CONNECT_RESPONSE before triggering timeout. Default: 10_000.
  • connectionstate_response_timeout: Number of milliseconds to wait for a CONNECTIONSTATE_RESPONSE before triggering timeout. Default: 10_000.
  • disconnect_response_timeout: Number of milliseconds to wait for a DISCONNECT_RESPONSE before triggering timeout. Default: 5_000.
  • tunnelling_ack_timeout: Number of milliseconds to wait for a TUNNELLING_ACK before triggering timeout. Default: 1_000.
Link to this type options() View Source
options() :: [option()]

Link to this section Functions

Sends a synchronous call to the Tunnel process and waits for a reply.

See Connection.call/2 for more information.

Link to this function call(conn, req, timeout) View Source
call(GenServer.server(), term(), timeout()) :: term()

Sends a synchronous request to the Tunnel process and waits for a reply.

See Connection.call/3 for more information.

Sends a asynchronous request to the Tunnel process.

See Connection.cast/2 for more information.

Link to this function reply(from, response) View Source
reply(GenServer.from(), term()) :: :ok

Sends a reply to a request sent by call/3.

See Connection.reply/2 for more information.

Link to this function start_link(module, module_args, tunnel_opts, genserver_opts \\ []) View Source

Starts a tunnel client linked to the calling process.

Once the server is started, the init/1 function of the given module is called with module_args as its argument to initialize the server.

tunnel_opts contains a list of options used by the Tunnel behaviour. These include the IP and port(s) which the client should bind to, and the IP and the port of the tunnel server to connect to. For more information see options/0.

genserver_opts are passed directly to GenServer. See GenServer.start_link/3 for more information.

Link to this section Callbacks

Link to this callback code_change(vsn, state, extra) View Source
code_change(vsn :: term(), state :: term(), extra :: term()) ::
  {:ok, state :: term()}

This callback is the same as Connection.code_change/3 and is used to change the state when loading a different version of the callback module.

Link to this callback handle_call(message, from, state) View Source (optional)
handle_call(message :: term(), from :: {pid(), term()}, state :: term()) ::
  {:send_telegram, telegram :: binary(), state :: term()}
  | {:send_telegram, telegram :: binary(), reply :: term(), state :: term()}
  | {:reply, reply :: term(), state :: term()}
  | {:reply, reply :: term(), state :: term(), :hibernate}
  | {:reply, reply :: term(), state :: term(), timeout :: timeout()}
  | {:noreply, state :: term()}
  | {:noreply, state :: term(), :hibernate}
  | {:noreply, state :: term(), timeout :: timeout()}
  | {:stop, reason :: term(), state :: term()}
  | {:stop, reason :: term(), reply :: term(), state :: term()}

Called when the process receives a call message sent by call/3. This callback has the same arguments as the GenServer equivalent and the :reply, :noreply and :stop return tuples behave the same. However there are two additional return values:

Returning {:send_telegram, telegram, state} will send the telegram to the tunnel server in a TUNNELLING_REQUEST, and then continue the loop with new state state.

Returning {:send_telegram, telegram, reply, state} will send the telegram to the tunnel server in a TUNNELLING_REQUEST, and then reply to the caller. The process will then continue the loop with new state state.

Link to this callback handle_cast(message, state) View Source (optional)
handle_cast(message :: term(), state :: term()) ::
  {:send_telegram, telegram :: binary(), state :: term()}
  | {:noreply, state :: term()}
  | {:noreply, state :: term(), timeout :: timeout() | :hibernate}
  | {:stop, reason :: term(), state :: term()}

Called when the process receives a cast message sent by cast/3. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same. However there is one additional return value:

Returning {:send_telegram, telegram, state} will send the telegram to the tunnel server in a TUNNELLING_REQUEST, and then continue the loop with new state state.

Link to this callback handle_info(message, state) View Source (optional)
handle_info(message :: term(), state :: term()) ::
  {:send_telegram, telegram :: binary(), state :: term()}
  | {:noreply, state :: term()}
  | {:noreply, state :: term(), timeout :: timeout() | :hibernate}
  | {:stop, reason :: term(), state :: term()}

Called when the process receives a message that is not a call or cast. This callback has the same arguments as the GenServer equivalent and the :noreply and :stop return tuples behave the same. However there are is one additional return value:

Returning {:send_telegram, telegram, state} will send the telegram to the tunnel server in a TUNNELLING_REQUEST, and then continue the loop with new state state.

Link to this callback init(args) View Source
init(args :: term()) ::
  {:ok, state :: term()} | :ignore | {:stop, reason :: term()}

Invoked when the tunnel process is started. start_link/4 will block until it returns.

args is the module_args term passed to start_link/4 (second argument).

Returning {:ok, state} will cause start_link/4 to return {:ok, pid} and the process to enter its loop. Immediately after entering the loop, the process will attempt to estabish a connection to the tunnel server.

Returning :ignore will cause start_link/4 to return :ignore and the process will exit normally without entering the loop or calling terminate/2.

Returning {:stop, reason} will cause start_link/4 to return {:error, reason} and the process to exit with reason reason without entering the loop or calling terminate/2.

Link to this callback on_connect(state) View Source
on_connect(state :: term()) ::
  {:send_telegram, telegram :: binary(), state :: term()}
  | {:ok, state :: term()}

Called when the process successfully establishes a connection to the tunnel server.

Returning {:send_telegram, telegram, state} will send the telegram to the tunnel server in a TUNNELLING_REQUEST, and then continue the loop with new state state.

Returning {:ok, state} will continue the loop with new state state.

Link to this callback on_disconnect(reason, state) View Source
on_disconnect(reason :: disconnect_reason(), state :: term()) ::
  {:backoff, timeout :: timeout(), state :: term()}

Called when the process fails to establish a connection to the tunnel server, or if the process disconnects due to a protocol error.

The callback must return a tuple matching {:backoff, timeout, state}, where timeout is the number of milliseconds to wait before reconnecting. Return {:backoff, 0, state} to reconnect instantly.

Link to this callback on_telegram(telegram, state) View Source
on_telegram(telegram :: binary(), state :: term()) ::
  {:send_telegram, telegram :: binary(), state :: term()}
  | {:ok, state :: term()}

Called when the process receives a new TUNNELLING_REQUEST.

The callback should return fast, as the behaviour only sends a TUNNELLING_ACK when the callback has returned - and the tunnel server will only wait 1 second for the TUNNELLING_ACK.

Note that this callback is not invoked multiple times if the TUNNELLING_REQUEST is a duplicate, or if the TUNNELLING_REQUEST is out of sequence.

Returning {:send_telegram, telegram, state} will send the telegram to the tunnel server in a TUNNELLING_REQUEST, and then continue the loop with new state state.

Returning {:ok, state} will continue the loop with new state state.

Link to this callback on_telegram_ack(state) View Source
on_telegram_ack(state :: term()) ::
  {:send_telegram, telegram :: binary(), state :: term()}
  | {:ok, state :: term()}

Called when the process receives a TUNNELLING_ACK which matches the last sent TUNNELLING_REQUEST.

Note that the callback is only invoked if the TUNNELLING_ACK does not indicate an error. If the TUNNELLING_REQUEST is not successfully acked, this is treated as a protocol error. The connection will be closed, and on_disconnect/2 will be invoked.

Returning {:send_telegram, telegram, state} will send the telegram to the tunnel server in a TUNNELLING_REQUEST, and then continue the loop with new state state.

Returning {:ok, state} will continue the loop with new state state.

Link to this callback terminate(reason, state) View Source
terminate(reason :: term(), state :: term()) :: term()

This callback is the same as Connection.terminate/2 and is called when the process terminates. The first argument is the reason the process is about to exit with.