View Source ExAliyunOts.Tunnel.Worker (ex_aliyun_ots v0.15.2)

The primary entry to use the tunnel service.

This bases on a GenServer module and runs under a DynamicSupervisor to maintain the completed life cycle of the heartbeat connection, it is charge of forward the received data records to the outside subscriber.

Here is an example for reference:

defmodule Sync do
  use GenServer

  def start(instance, tunnel_id) do
    GenServer.start(__MODULE__, %{instance: instance, tunnel_id: tunnel_id})
  end

  def listen(pid) do
    GenServer.cast(pid, :listen)
  end

  @impl true
  def init(config) do
    {:ok, config}
  end

  @impl true
  def handle_cast(:listen, state) do
    ExAliyunOts.Tunnel.Worker.start_connect(state.instance, [tunnel_id: state.tunnel_id])
    {:noreply, state}
  end

  @impl true
  def handle_call({:record_event, {records, _next_token}}, _from, state) do
    # ...
    # data records changes will be received in here,
    # and handle them in the `continue` processing,
    # please inspect `records` for details.
    # ...
    {:reply, :ok, state, {:continue, records}}
  end

  @impl true
  def handle_continue([record | reset] = records, state) do
    # process `records` in here
    {:noreply, state, {:continue, reset}}
  end
  def handle_continue([], state) do
    {:noreply, state}
  end
end

Here we define a module named Sync, and use it like this:

instance = :my_instance
tunnel_id = "..."
{:ok, pid} = Sync.start(instance, tunnel_id)
Sync.listen(pid)
Process.sleep(:infinity)

Or we can use this Sync module with other process/supervisor.

Summary

Functions

Returns a specification to start this module under a supervisor.

Callback implementation for GenServer.init/1.

Start a supervised tunnel worker which maintains a heartbeat connection to the tunnel service internally, it will make the current process who calls this function as a subscriber, the subscriber will receive data records status information via handle_call/3 in {:record_event, {records, next_token}} message format later once there are some data records changed with insert/update/delete operations, please inspect records for details.

Used for the supervisor.

Stop the tunnel worker process by tunnel id.

Functions

Returns a specification to start this module under a supervisor.

See Supervisor.

Callback implementation for GenServer.init/1.

Link to this function

start_connect(instance, opts)

View Source
@spec start_connect(instance :: atom(), opts :: Keyword.t()) :: {:ok, pid()}

Start a supervised tunnel worker which maintains a heartbeat connection to the tunnel service internally, it will make the current process who calls this function as a subscriber, the subscriber will receive data records status information via handle_call/3 in {:record_event, {records, next_token}} message format later once there are some data records changed with insert/update/delete operations, please inspect records for details.

Options

  • :tunnel_id, required, the tunnel id to setup the tunnel working flow.
  • :heartbeat_interval, time to run heartbeat internally, defaults to 30 seconds, at least 5 seconds.
  • :heartbeat_timeout, heartbeat timeout, defaults to 300 seconds.
  • :connect_timeout, timeout setting in "ClientConfig" tunnel proto file, defaults to 300 seconds.
  • :client_tag, optional, the custom client tag that is used to generate a tunnel client id, can customize this parameter to uniquely identify tunnel workers.

Used for the supervisor.

@spec stop(tunnel_id :: String.t()) :: :ok

Stop the tunnel worker process by tunnel id.