Task and gen_tcp
View SourceIn this chapter, we are going to learn how to use Erlang's :gen_tcp
module to serve requests. This provides a great opportunity to explore Elixir's Task
module. In future chapters, we will expand our server so that it can actually interact with buckets.
Echo server
We will start our TCP server by first implementing an echo server. It will send a response with the text it received in the request. We will slowly improve our server until it is supervised and ready to handle multiple connections.
A TCP server, in broad strokes, performs the following steps:
- Listens to a port until the port is available and it gets hold of the socket
- Waits for a client connection on that port and accepts it
- Reads the client request and writes a response back
Let's implement those steps. Create a new lib/kv/server.ex
and add the following functions:
defmodule KV.Server do
require Logger
def accept(port) do
# The options below mean:
#
# 1. `:binary` - receives data as binaries (instead of lists)
# 2. `packet: :line` - receives data line by line
# 3. `active: false` - blocks on `:gen_tcp.recv/2` until data is available
# 4. `reuseaddr: true` - allows us to reuse the address if the listener crashes
#
{:ok, socket} =
:gen_tcp.listen(port, [:binary, packet: :line, active: false, reuseaddr: true])
Logger.info("Accepting connections on port #{port}")
loop_acceptor(socket)
end
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
serve(client)
loop_acceptor(socket)
end
defp serve(socket) do
socket
|> read_line()
|> write_line(socket)
serve(socket)
end
defp read_line(socket) do
{:ok, data} = :gen_tcp.recv(socket, 0)
data
end
defp write_line(line, socket) do
:gen_tcp.send(socket, line)
end
end
We are going to start our server by calling KV.Server.accept(4040)
, where 4040 is the port. The first step in accept/1
is to listen to the port until the socket becomes available and then call loop_acceptor/1
. loop_acceptor/1
is a loop accepting client connections. For each accepted connection, we call serve/1
.
serve/1
is another loop that reads a line from the socket and writes those lines back to the socket. Note that the serve/1
function uses the pipe operator |>/2
to express this flow of operations. The pipe operator evaluates the left side and passes its result as the first argument to the function on the right side. The example above:
socket |> read_line() |> write_line(socket)
is equivalent to:
write_line(read_line(socket), socket)
The read_line/1
implementation receives data from the socket using :gen_tcp.recv/2
and write_line/2
writes to the socket using :gen_tcp.send/2
.
Note that serve/1
is an infinite loop called sequentially inside loop_acceptor/1
, so the tail call to loop_acceptor/1
is never reached and could be avoided. However, as we shall see, we will need to execute serve/1
in a separate process, so we will need that tail call soon.
This is pretty much all we need to implement our echo server. Let's give it a try!
Start an IEx session inside the kv_server
application with iex -S mix
. Inside IEx, run:
iex> KV.Server.accept(4040)
The server is now running, and you will even notice the console is blocked. Let's use a telnet
client to access our server. There are clients available on most operating systems, and their command lines are generally similar:
$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
is it me
is it me
you are looking for?
you are looking for?
Type "hello", press enter, and you will get "hello" back. Excellent!
My particular telnet client can be exited by typing ctrl + ]
, typing quit
, and pressing <Enter>
, but your client may require different steps.
Once you exit the telnet client, you will likely see an error in the IEx session:
** (MatchError) no match of right hand side value: {:error, :closed}
(kv) lib/kv/server.ex:45: KV.Server.read_line/1
(kv) lib/kv/server.ex:37: KV.Server.serve/1
(kv) lib/kv/server.ex:30: KV.Server.loop_acceptor/1
That's because we were expecting data from :gen_tcp.recv/2
but the client closed the connection. We need to handle such cases better in future revisions of our server.
For now, there is a more important bug we need to fix: what happens if our TCP acceptor crashes? Since there is no supervision, the server dies and we won't be able to serve more requests, because it won't be restarted. That's why we must move our server to a supervision tree.
Tasks
Whenever you have an existing function and you simply want to execute it when your application starts, the Task
module is exactly you need. For example, it has a Task.start_link/1
function that receives an anonymous function and executes it inside a new process that will be part of a supervision tree.
Let's give it a try. Open up lib/kv.ex
and let's add a new child:
def start(_type, _args) do
children = [
{Registry, name: KV, keys: :unique},
{DynamicSupervisor, name: KV.BucketSupervisor, strategy: :one_for_one},
{Task, fn -> KV.Server.accept(4040) end}
]
Supervisor.start_link(children, strategy: :one_for_one)
end
With this change, we are saying that we want to run KV.Server.accept(4040)
as a task. We are hardcoding the port for now but we will make this a configuration in later chapters. As usual, we've passed a two-element tuple as a child specification, which in turn will invoke Task.start_link/1
.
Now that the server is part of the supervision tree, it should start automatically when we run the application. Run iex -S mix
to boot the app and use the telnet
client to make sure that everything still works:
$ telnet 127.0.0.1 4321
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
say you
say you
say me
say me
Yes, it works! However, can it handle more than one client?
Try to connect two telnet clients at the same time. When you do so, you will notice that the second client doesn't echo:
$ telnet 127.0.0.1 4321
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello?
HELLOOOOOO?
It doesn't seem to work at all. That's because we are serving requests in the same process that are accepting connections. When one client is connected, we can't accept another client.
Adding (flawed) concurrency
In order to make our server handle simultaneous connections, we need to have one process working as an acceptor that spawns other processes to serve requests. One solution would be to change:
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
serve(client)
loop_acceptor(socket)
end
to also use Task.start_link/1
:
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
{:ok, pid} = Task.start_link(fn -> serve(client) end)
:ok = :gen_tcp.controlling_process(client, pid)
loop_acceptor(socket)
end
In the new acceptor loop, we are starting a new task every time there is a new client. Now, if you attempt to connect two clients at the same time, it should work!
Or does it? For example, what happens when you exit one telnet session? The other session should crash! The reason of this crash is two fold:
We have a bug in our server where we don't expect
:gen_tcp.recv/2
to return an{:error, :closed}
tupleBecause each server task is linked to the acceptor process, if one task crashes, the acceptor process will also crash, taking down all other tasks and clients
An important rule thumb throughout this guide is to always start processes as children of supervisors. The code above is an excellent example of what happens when we don't. If we don't isolate the different parts of our systems, failures can now cascade through our system, as it would happen in other languages.
To fix this, we could use a DynamicSupervisor
, but tasks also provide a specialized Task.Supervisor
which has better ergonomics and is optimized for supervising tasks themselves. Let's give it a try.
Adding a task supervisor
Let's change start/2
in lib/kv.ex
once more, to add the task supervisor to our tree:
def start(_type, _args) do
children = [
{Registry, name: KV, keys: :unique},
{DynamicSupervisor, name: KV.BucketSupervisor, strategy: :one_for_one},
{Task.Supervisor, name: KV.ServerSupervisor},
{Task, fn -> KV.Server.accept(4040) end}
]
Supervisor.start_link(children, strategy: :one_for_one)
end
We'll now start a Task.Supervisor
process with name KV.TaskSupervisor
. Keep in mind that the order children are started matters. For example, the acceptor must come last because, if it comes first, it means our application can start accepting requests before the Task.Supervisor
is running or before we can locate buckets. Shutting down an application will also stop the children in reverse order, guaranteeing a clean termination.
Now we need to change loop_acceptor/1
to use Task.Supervisor
to serve each request:
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
{:ok, pid} = Task.Supervisor.start_child(KV.BucketSupervisor, fn -> serve(client) end)
:ok = :gen_tcp.controlling_process(client, pid)
loop_acceptor(socket)
end
You might notice that we added a line, :ok = :gen_tcp.controlling_process(client, pid)
. This makes the child process the "controlling process" of the client
socket. If we didn't do this, the acceptor would bring down all the clients if it crashed because sockets would be tied to the process that accepted them (which is the default behavior).
Now start a new server with iex -S mix
and try to open up many concurrent telnet clients. You will notice that quitting a client does not bring the acceptor down, even though we haven't fixed the bug in :gen_tcp.recv/2
yet (which we will address in the next chapter). Excellent!
Restart strategies
There is one important topic we haven't explored yet with the necessary depth. What happens when a supervised process crashes?
In the previous chapter, when we started a bucket and killed it, the supervisor automatically started one in its place:
iex> children = [{KV.Bucket, name: :shopping}]
iex> Supervisor.start_link(children, strategy: :one_for_one)
iex> KV.Bucket.put(:shopping, "milk", 1)
iex> pid = Process.whereis(:shopping)
#PID<0.48.0>
iex> Process.exit(pid, :kill)
true
iex> Process.whereis(:shopping)
#PID<0.50.0>
What exactly happens when a process terminates is part of its child specification. For KV.Bucket
, we have this:
iex> KV.Bucket.child_spec([])
%{id: KV.Bucket, start: {KV.Bucket, :start_link, [[]]}}
However, for tasks, we have this:
iex> Task.child_spec(fn -> :ok end)
%{
id: Task,
restart: :temporary,
start: {Task, :start_link, [#Function<43.39164016/0 in :erl_eval.expr/6>]}
}
Notice that a task says :restart
is :temporary
. KV.Bucket
says nothing, which means it defaults to :permanent
. :temporary
means that a process is never restarted, regardless of why it crashed. :permanent
means a process is always restarted, regardless of the exit reason. There is also :transient
, which means it won't be restarted as long as it terminates successfully.
Now we must ask ourselves, are those the correct settings?
For KV.Bucket
, using :permanent
seem logical, as should not request the user to recreate a bucket they have previous created. Although currently we would lose the bucket data, in actual system we would add mechanisms to recover it on initialization. However, for tasks, we have used them in two opposing ways in this chapter, which means at least one of them is wrong.
We use a task to start the acceptor. The acceptor is a critical component of our infrastructure. If it crashes, it means we won't accept further requests, and our server would then be useless as no one can connect to it. On the other hand, we also use Task.Supervisor
to start tasks that deal with each connection. In this case, restarting may not be useful at all, given the reason we crashed could just as well be a connection issue, and attempting to restart over the same connection would lead to further failures.
Therefore, we want the acceptor to actually run in :permanent
mode, while we preserve the Task.Supervisor
as :temporary
. Luckily Elixir has an API that allows us to change an existing child specification, which we use below.
Let's change start/2
in lib/kv.ex
once more to the following:
def start(_type, _args) do
children = [
{Registry, name: KV, keys: :unique},
{DynamicSupervisor, name: KV.BucketSupervisor, strategy: :one_for_one},
{Task.Supervisor, name: KV.ServerSupervisor},
Supervisor.child_spec({Task, fn -> KV.Server.accept(4040) end}, restart: :permanent)
]
Supervisor.start_link(children, strategy: :one_for_one)
end
Now we have an always running acceptor that starts temporary task processes under an always running task supervisor.
Leveraging the ecosystem
In this chapter, we implemented a basic TCP acceptor while exploring concurrency and fault-tolerance. Our acceptor can manage concurrent connections, but it is still not ready for production. Production-ready TCP servers run a pool of acceptors, each with their own supervisor. Elixir's PartitionSupervisor
might be used to partition and scale the acceptor, but it is out of scope for this guide. In practice, you will use existing packages tailored for this use-case, such as Ranch (in Erlang) or Thousand Island (in Elixir).
In the next chapter, we will start parsing the client requests and sending responses, finishing our server.