今天实现服务器的第二个部件 - gate_server
根据[开篇架构设想](/posts/mmo-server-from-scratch/2022/20220608-mmo-server-from-scratch(0)-introduction.md)中的想法,gate_servergate_server 是用来接受用户连接的服务器。客户端通过 TCP Socket 的方式连接到 gate_servergate_server 上来, gate_servergate_server 负责把客户端发来的消息转发到指定的业务相关服务器上,并将服务器产生的消息发回客户端。
所以,根据上面的描述,gate_servergate_server 至少需要具备以下功能:
- 连接
beacon_serverbeacon_server服务器,注册自身并获取自身需要的其他服务器资源
- 监听TCP端口并接受连接
- 接受客户端消息并转发至其他服务器
- 向客户端发送消息
在 Erlang/ElixirErlang/Elixir 中,进程的使用是极其廉价的,这里说的进程不是系统进程,而是 BeamBeam 虚拟机进程,属于用户进程。因此我打算为每个客户端传入的 SocketSocket 连接分配一个 GenServerGenServer ,用于消息交换和状态保存。
对于消息协议,我选择了Protobuf,因此 gate_servergate_server 还需要具备消息的编解码能力。
首先建立本节的 gate_servergate_server 项目:
bash
2
mix new gate_server --sup
2
mix new gate_server --sup
bash
2
mix new gate_server --sup
2
mix new gate_server --sup
为了实现以上功能,需要划分几个模块:一个 InterfaceInterface 模块负责集群相关操作,如注册、加入集群、获取其他服务器节点等;一个 TcpAcceptorTcpAcceptor 模块负责监听端口并接受TCP连接;一个 TcpConnectionTcpConnection 模块负责和客户端进行通信。
按照这个设计,gate_servergate_server应用的监督树如下:
text
5
TcpAcceptorSup InterfaceSup TcpConnectionSup
9
Interface TcpAcceptor TcpConnection x N
5
TcpAcceptorSup InterfaceSup TcpConnectionSup
9
Interface TcpAcceptor TcpConnection x N
text
5
TcpAcceptorSup InterfaceSup TcpConnectionSup
9
Interface TcpAcceptor TcpConnection x N
5
TcpAcceptorSup InterfaceSup TcpConnectionSup
9
Interface TcpAcceptor TcpConnection x N
其中:
- application -
gate_servergate_server主程序
- TcpAcceptorSup -
TCPTCP监听进程监督者进程
- InterfaceSup -
InterfaceInterface模块监督者进程
- TcpConnectionSup -
TcpConnectionTcpConnection模块监督者进程
- Interface - 集群接口模块进程
- TcpAcceptor -
TcpTcp监听模块进程
- TcpConnection - 用户连接进程
关于监督者进程的创建我就不说了,查阅各种文档都可以找到方法。首先来实现一下 InterfaceInterface 的功能。
InterfaceInterface进程的初始化流程:
- 建立
GenServerGenServer
- 连接给定的
beacon_serverbeacon_server节点
- 连接成功后调用
beacon_serverbeacon_server的registerregister接口,注册自身
- 向
beacon_serverbeacon_server请求自身所需的其他节点
受前面实现的 beacon_serverbeacon_server 限制,这个流程姑且就先这样,后续再继续优化。
我不在 initinit 函数中进行以上动作,而是将逻辑放置到 timeouttimeout 消息处理中,使得进程尽快完成初始化开始接收消息。代码如下:
elixir
3
{:ok, %{auth_server: [], server_state: :waiting_requirements}, 0}
7
def handle_info(:timeout, state) do
8
send(self(), :establish_links)
13
def handle_info(:establish_links, state) do
14
Logger.info("===Starting #{Application.get_application(__MODULE__)} node initialization===", ansi_color: :blue)
18
new_state = get_requirements(state)
20
Logger.info("===Server initialization complete, server ready===", ansi_color: :blue)
21
{:noreply, %{new_state | server_state: :ready}}
3
{:ok, %{auth_server: [], server_state: :waiting_requirements}, 0}
7
def handle_info(:timeout, state) do
8
send(self(), :establish_links)
13
def handle_info(:establish_links, state) do
14
Logger.info("===Starting #{Application.get_application(__MODULE__)} node initialization===", ansi_color: :blue)
18
new_state = get_requirements(state)
20
Logger.info("===Server initialization complete, server ready===", ansi_color: :blue)
21
{:noreply, %{new_state | server_state: :ready}}
elixir
3
{:ok, %{auth_server: [], server_state: :waiting_requirements}, 0}
7
def handle_info(:timeout, state) do
8
send(self(), :establish_links)
13
def handle_info(:establish_links, state) do
14
Logger.info("===Starting #{Application.get_application(__MODULE__)} node initialization===", ansi_color: :blue)
18
new_state = get_requirements(state)
20
Logger.info("===Server initialization complete, server ready===", ansi_color: :blue)
21
{:noreply, %{new_state | server_state: :ready}}
3
{:ok, %{auth_server: [], server_state: :waiting_requirements}, 0}
7
def handle_info(:timeout, state) do
8
send(self(), :establish_links)
13
def handle_info(:establish_links, state) do
14
Logger.info("===Starting #{Application.get_application(__MODULE__)} node initialization===", ansi_color: :blue)
18
new_state = get_requirements(state)
20
Logger.info("===Server initialization complete, server ready===", ansi_color: :blue)
21
{:noreply, %{new_state | server_state: :ready}}
join_beacon/0join_beacon/0、register_beacon/0register_beacon/0、get_requirements/1get_requirements/1 三个函数我就不放了,基本需要的东西就是 Node.connect/1Node.connect/1 和 GenServer.call/3GenServer.call/3。
运行效果:
Figure 1: Interface 运行效果
此时 beacon_serverbeacon_server 的 statestate 数据:
Figure 2: beacon_server state 内容
此时如果在 iexiex 中输入:
可以发现我们的 gate_servergate_server 已经和 beacon_serverbeacon_server 建立连接了:
Figure 3: beacon_server state 内容
TcpAcceptorTcpAcceptor 是用来接收 TCPTCP 传入链接的进程,同样是一个 GenServerGenServer。这个进程的逻辑非常简单,直接看代码:
elixir
2
{:ok, socket} = :gen_tcp.listen(port, [:binary, packet: 0, active: true, reuseaddr: true])
4
Logger.debug("Accepting connections on port #{port}")
8
defp loop_acceptor(socket) do
9
{:ok, client} = :gen_tcp.accept(socket)
12
DynamicSupervisor.start_child(
13
GateServer.TcpConnectionSup,
14
{GateServer.TcpConnection, client}
17
:ok = :gen_tcp.controlling_process(client, pid)
2
{:ok, socket} = :gen_tcp.listen(port, [:binary, packet: 0, active: true, reuseaddr: true])
4
Logger.debug("Accepting connections on port #{port}")
8
defp loop_acceptor(socket) do
9
{:ok, client} = :gen_tcp.accept(socket)
12
DynamicSupervisor.start_child(
13
GateServer.TcpConnectionSup,
14
{GateServer.TcpConnection, client}
17
:ok = :gen_tcp.controlling_process(client, pid)
elixir
2
{:ok, socket} = :gen_tcp.listen(port, [:binary, packet: 0, active: true, reuseaddr: true])
4
Logger.debug("Accepting connections on port #{port}")
8
defp loop_acceptor(socket) do
9
{:ok, client} = :gen_tcp.accept(socket)
12
DynamicSupervisor.start_child(
13
GateServer.TcpConnectionSup,
14
{GateServer.TcpConnection, client}
17
:ok = :gen_tcp.controlling_process(client, pid)
2
{:ok, socket} = :gen_tcp.listen(port, [:binary, packet: 0, active: true, reuseaddr: true])
4
Logger.debug("Accepting connections on port #{port}")
8
defp loop_acceptor(socket) do
9
{:ok, client} = :gen_tcp.accept(socket)
12
DynamicSupervisor.start_child(
13
GateServer.TcpConnectionSup,
14
{GateServer.TcpConnection, client}
17
:ok = :gen_tcp.controlling_process(client, pid)
listen/1listen/1 函数用来监听指定的端口,在成功之后开始接受传入 TCPTCP 连接;loop_acceptor/1loop_acceptor/1 函数用来循环接受 TCPTCP 连接,一旦接受一个连接,则为其创建一个 TcpConnectionTcpConnection 进程,并将该链接的 socketsocket 控制权转交给新生成的 TcpConnectionTcpConnection 进程。在 ElixirElixir 中,尾递归可以被优化,不会无限制占用栈空间,因此在函数的最末尾进行递归调用实现循环接受 TCPTCP 连接。
这是本节最核心的功能模块,负责与客户端的一切通信。
TcpConnectionTcpConnection 同样是一个 GenServerGenServer,用于保存一些状态和传输 TCPTCP 数据。其初始化函数如下:
elixir
3
Logger.debug("New client connected.")
4
{:ok, %{socket: socket, status: :waiting_auth}}
3
Logger.debug("New client connected.")
4
{:ok, %{socket: socket, status: :waiting_auth}}
elixir
3
Logger.debug("New client connected.")
4
{:ok, %{socket: socket, status: :waiting_auth}}
3
Logger.debug("New client connected.")
4
{:ok, %{socket: socket, status: :waiting_auth}}
可以看到,我们把 socketsocket 信息存入了进程的 statestate 中,方便后续调取。此处的 statusstatus 属性暂且抛开不管,用于后续用户的鉴权,本节不作讨论。
接下来是本进程的核心函数 - TCPTCP 消息接收函数:
elixir
2
def handle_info({:tcp, _socket, data}, %{socket: socket} = state) do
3
result = "You've typed: #{data}"
4
send_data(senddata, socket)
2
def handle_info({:tcp, _socket, data}, %{socket: socket} = state) do
3
result = "You've typed: #{data}"
4
send_data(senddata, socket)
elixir
2
def handle_info({:tcp, _socket, data}, %{socket: socket} = state) do
3
result = "You've typed: #{data}"
4
send_data(senddata, socket)
2
def handle_info({:tcp, _socket, data}, %{socket: socket} = state) do
3
result = "You've typed: #{data}"
4
send_data(senddata, socket)
此处为了方便测试,先写一个简单的 echo 功能。GenServerGenServer 还贴心地提供了对 TCPTCP 连接状态变化的处理,只需实现以下两个函数:
elixir
2
def handle_info({:tcp_closed, _conn}, state) do
3
Logger.error("Socket #{inspect(state.socket, pretty: true)} closed unexpectly.")
4
DynamicSupervisor.terminate_child(GateServer.TcpConnectionSup, self())
6
{:stop, :normal, state}
10
def handle_info({:tcp_error, _conn, err}, state) do
11
Logger.error("Socket #{inspect(state.socket, pretty: true)} error: #{err}")
12
DynamicSupervisor.terminate_child(GateServer.TcpConnectionSup, self())
14
{:stop, :normal, state}
2
def handle_info({:tcp_closed, _conn}, state) do
3
Logger.error("Socket #{inspect(state.socket, pretty: true)} closed unexpectly.")
4
DynamicSupervisor.terminate_child(GateServer.TcpConnectionSup, self())
6
{:stop, :normal, state}
10
def handle_info({:tcp_error, _conn, err}, state) do
11
Logger.error("Socket #{inspect(state.socket, pretty: true)} error: #{err}")
12
DynamicSupervisor.terminate_child(GateServer.TcpConnectionSup, self())
14
{:stop, :normal, state}
elixir
2
def handle_info({:tcp_closed, _conn}, state) do
3
Logger.error("Socket #{inspect(state.socket, pretty: true)} closed unexpectly.")
4
DynamicSupervisor.terminate_child(GateServer.TcpConnectionSup, self())
6
{:stop, :normal, state}
10
def handle_info({:tcp_error, _conn, err}, state) do
11
Logger.error("Socket #{inspect(state.socket, pretty: true)} error: #{err}")
12
DynamicSupervisor.terminate_child(GateServer.TcpConnectionSup, self())
14
{:stop, :normal, state}
2
def handle_info({:tcp_closed, _conn}, state) do
3
Logger.error("Socket #{inspect(state.socket, pretty: true)} closed unexpectly.")
4
DynamicSupervisor.terminate_child(GateServer.TcpConnectionSup, self())
6
{:stop, :normal, state}
10
def handle_info({:tcp_error, _conn, err}, state) do
11
Logger.error("Socket #{inspect(state.socket, pretty: true)} error: #{err}")
12
DynamicSupervisor.terminate_child(GateServer.TcpConnectionSup, self())
14
{:stop, :normal, state}
这里我们就可以对 TCPTCP 的连接建立以及信息收发功能进行测试了。假设我的 gate_servergate_server 运行在本机 2900029000 端口上,我们运行 TelnetTelnet:
即可开始与服务器进行通信。测试结果:
Figure 4: beacon_server state 内容
gate_servergate_server 到这里暂告一段落,一个能够接受 TCPTCP 客户端连接并且进行高并发通信的服务器就基本完成了。下一步我们将继续实现消息协议相关内容,引入 ProtobufProtobuf 消息库并进行消息分发,进一步完善网关服务器功能。此部分功能等到下一步实现场景服务器 scene_serverscene_server 后再回来继续,敬请期待!