MMO Server From Scratch(2) - Gate Server



今天实现服务器的第二个部件 - gate_server

功能解析

根据[开篇架构设想](/posts/mmo-server-from-scratch/2022/20220608-mmo-server-from-scratch(0)-introduction.md)中的想法,gate_servergate_server 是用来接受用户连接的服务器。客户端通过 TCP Socket 的方式连接到 gate_servergate_server 上来, gate_servergate_server 负责把客户端发来的消息转发到指定的业务相关服务器上,并将服务器产生的消息发回客户端。

所以,根据上面的描述,gate_servergate_server 至少需要具备以下功能:

  1. 连接beacon_serverbeacon_server服务器,注册自身并获取自身需要的其他服务器资源
  2. 监听TCP端口并接受连接
  3. 接受客户端消息并转发至其他服务器
  4. 向客户端发送消息

Erlang/ElixirErlang/Elixir 中,进程的使用是极其廉价的,这里说的进程不是系统进程,而是 BeamBeam 虚拟机进程,属于用户进程。因此我打算为每个客户端传入的 SocketSocket 连接分配一个 GenServerGenServer ,用于消息交换和状态保存。

对于消息协议,我选择了Protobuf,因此 gate_servergate_server 还需要具备消息的编解码能力。

简要实现

建立项目

首先建立本节的 gate_servergate_server 项目:

bash
1
cd apps/
2
mix new gate_server --sup
1
cd apps/
2
mix new gate_server --sup
bash
1
cd apps/
2
mix new gate_server --sup
1
cd apps/
2
mix new gate_server --sup

模块划分

为了实现以上功能,需要划分几个模块:一个 InterfaceInterface 模块负责集群相关操作,如注册、加入集群、获取其他服务器节点等;一个 TcpAcceptorTcpAcceptor 模块负责监听端口并接受TCP连接;一个 TcpConnectionTcpConnection 模块负责和客户端进行通信。

按照这个设计,gate_servergate_server应用的监督树如下:

text
1
                            application
2
                           /     |     \
3
                          /      |      \
4
                         /       |       \
5
           TcpAcceptorSup   InterfaceSup  TcpConnectionSup
6
                  |              |               |  
7
                  |              |               |  
8
                  |              |               |  
9
              Interface     TcpAcceptor   TcpConnection x N
10
              
1
                            application
2
                           /     |     \
3
                          /      |      \
4
                         /       |       \
5
           TcpAcceptorSup   InterfaceSup  TcpConnectionSup
6
                  |              |               |  
7
                  |              |               |  
8
                  |              |               |  
9
              Interface     TcpAcceptor   TcpConnection x N
10
              
text
1
                            application
2
                           /     |     \
3
                          /      |      \
4
                         /       |       \
5
           TcpAcceptorSup   InterfaceSup  TcpConnectionSup
6
                  |              |               |  
7
                  |              |               |  
8
                  |              |               |  
9
              Interface     TcpAcceptor   TcpConnection x N
10
              
1
                            application
2
                           /     |     \
3
                          /      |      \
4
                         /       |       \
5
           TcpAcceptorSup   InterfaceSup  TcpConnectionSup
6
                  |              |               |  
7
                  |              |               |  
8
                  |              |               |  
9
              Interface     TcpAcceptor   TcpConnection x N
10
              

其中:

  • application - gate_servergate_server主程序
  • TcpAcceptorSup - TCPTCP监听进程监督者进程
  • InterfaceSup - InterfaceInterface模块监督者进程
  • TcpConnectionSup - TcpConnectionTcpConnection模块监督者进程
  • Interface - 集群接口模块进程
  • TcpAcceptor - TcpTcp监听模块进程
  • TcpConnection - 用户连接进程

Interface

关于监督者进程的创建我就不说了,查阅各种文档都可以找到方法。首先来实现一下 InterfaceInterface 的功能。

InterfaceInterface进程的初始化流程:

  1. 建立GenServerGenServer
  2. 连接给定的beacon_serverbeacon_server节点
  3. 连接成功后调用beacon_serverbeacon_serverregisterregister接口,注册自身
  4. beacon_serverbeacon_server请求自身所需的其他节点

受前面实现的 beacon_serverbeacon_server 限制,这个流程姑且就先这样,后续再继续优化。

我不在 initinit 函数中进行以上动作,而是将逻辑放置到 timeouttimeout 消息处理中,使得进程尽快完成初始化开始接收消息。代码如下:

elixir
1
@impl true
2
def init(_init_arg) do
3
    {:ok, %{auth_server: [], server_state: :waiting_requirements}, 0}
4
end
5

6
@impl true
7
def handle_info(:timeout, state) do
8
    send(self(), :establish_links)
9
    {:noreply, state}
10
end
11

12
@impl true
13
def handle_info(:establish_links, state) do
14
    Logger.info("===Starting #{Application.get_application(__MODULE__)} node initialization===", ansi_color: :blue)
15
    
16
    join_beacon()
17
    register_beacon()
18
    new_state = get_requirements(state)
19
    
20
    Logger.info("===Server initialization complete, server ready===", ansi_color: :blue)
21
    {:noreply, %{new_state | server_state: :ready}}
22
end
1
@impl true
2
def init(_init_arg) do
3
    {:ok, %{auth_server: [], server_state: :waiting_requirements}, 0}
4
end
5

6
@impl true
7
def handle_info(:timeout, state) do
8
    send(self(), :establish_links)
9
    {:noreply, state}
10
end
11

12
@impl true
13
def handle_info(:establish_links, state) do
14
    Logger.info("===Starting #{Application.get_application(__MODULE__)} node initialization===", ansi_color: :blue)
15
    
16
    join_beacon()
17
    register_beacon()
18
    new_state = get_requirements(state)
19
    
20
    Logger.info("===Server initialization complete, server ready===", ansi_color: :blue)
21
    {:noreply, %{new_state | server_state: :ready}}
22
end
elixir
1
@impl true
2
def init(_init_arg) do
3
    {:ok, %{auth_server: [], server_state: :waiting_requirements}, 0}
4
end
5

6
@impl true
7
def handle_info(:timeout, state) do
8
    send(self(), :establish_links)
9
    {:noreply, state}
10
end
11

12
@impl true
13
def handle_info(:establish_links, state) do
14
    Logger.info("===Starting #{Application.get_application(__MODULE__)} node initialization===", ansi_color: :blue)
15
    
16
    join_beacon()
17
    register_beacon()
18
    new_state = get_requirements(state)
19
    
20
    Logger.info("===Server initialization complete, server ready===", ansi_color: :blue)
21
    {:noreply, %{new_state | server_state: :ready}}
22
end
1
@impl true
2
def init(_init_arg) do
3
    {:ok, %{auth_server: [], server_state: :waiting_requirements}, 0}
4
end
5

6
@impl true
7
def handle_info(:timeout, state) do
8
    send(self(), :establish_links)
9
    {:noreply, state}
10
end
11

12
@impl true
13
def handle_info(:establish_links, state) do
14
    Logger.info("===Starting #{Application.get_application(__MODULE__)} node initialization===", ansi_color: :blue)
15
    
16
    join_beacon()
17
    register_beacon()
18
    new_state = get_requirements(state)
19
    
20
    Logger.info("===Server initialization complete, server ready===", ansi_color: :blue)
21
    {:noreply, %{new_state | server_state: :ready}}
22
end

join_beacon/0join_beacon/0register_beacon/0register_beacon/0get_requirements/1get_requirements/1 三个函数我就不放了,基本需要的东西就是 Node.connect/1Node.connect/1GenServer.call/3GenServer.call/3

运行效果:

Figure 1: Interface 运行效果

此时 beacon_serverbeacon_serverstatestate 数据:

Figure 2: beacon_server state 内容

此时如果在 iexiex 中输入:

elixir
1
Node.list
1
Node.list
elixir
1
Node.list
1
Node.list

可以发现我们的 gate_servergate_server 已经和 beacon_serverbeacon_server 建立连接了:

Figure 3: beacon_server state 内容

TcpAcceptor

TcpAcceptorTcpAcceptor 是用来接收 TCPTCP 传入链接的进程,同样是一个 GenServerGenServer。这个进程的逻辑非常简单,直接看代码:

elixir
1
defp listen(port) do
2
    {:ok, socket} = :gen_tcp.listen(port, [:binary, packet: 0, active: true, reuseaddr: true])
3
    
4
    Logger.debug("Accepting connections on port #{port}")
5
    loop_acceptor(socket)
6
end
7

8
defp loop_acceptor(socket) do
9
    {:ok, client} = :gen_tcp.accept(socket)
10
    
11
    {:ok, pid} =
12
        DynamicSupervisor.start_child(
13
        GateServer.TcpConnectionSup,
14
        {GateServer.TcpConnection, client}
15
        )
16
        
17
    :ok = :gen_tcp.controlling_process(client, pid)
18
    
19
    loop_acceptor(socket)
20
end
1
defp listen(port) do
2
    {:ok, socket} = :gen_tcp.listen(port, [:binary, packet: 0, active: true, reuseaddr: true])
3
    
4
    Logger.debug("Accepting connections on port #{port}")
5
    loop_acceptor(socket)
6
end
7

8
defp loop_acceptor(socket) do
9
    {:ok, client} = :gen_tcp.accept(socket)
10
    
11
    {:ok, pid} =
12
        DynamicSupervisor.start_child(
13
        GateServer.TcpConnectionSup,
14
        {GateServer.TcpConnection, client}
15
        )
16
        
17
    :ok = :gen_tcp.controlling_process(client, pid)
18
    
19
    loop_acceptor(socket)
20
end
elixir
1
defp listen(port) do
2
    {:ok, socket} = :gen_tcp.listen(port, [:binary, packet: 0, active: true, reuseaddr: true])
3
    
4
    Logger.debug("Accepting connections on port #{port}")
5
    loop_acceptor(socket)
6
end
7

8
defp loop_acceptor(socket) do
9
    {:ok, client} = :gen_tcp.accept(socket)
10
    
11
    {:ok, pid} =
12
        DynamicSupervisor.start_child(
13
        GateServer.TcpConnectionSup,
14
        {GateServer.TcpConnection, client}
15
        )
16
        
17
    :ok = :gen_tcp.controlling_process(client, pid)
18
    
19
    loop_acceptor(socket)
20
end
1
defp listen(port) do
2
    {:ok, socket} = :gen_tcp.listen(port, [:binary, packet: 0, active: true, reuseaddr: true])
3
    
4
    Logger.debug("Accepting connections on port #{port}")
5
    loop_acceptor(socket)
6
end
7

8
defp loop_acceptor(socket) do
9
    {:ok, client} = :gen_tcp.accept(socket)
10
    
11
    {:ok, pid} =
12
        DynamicSupervisor.start_child(
13
        GateServer.TcpConnectionSup,
14
        {GateServer.TcpConnection, client}
15
        )
16
        
17
    :ok = :gen_tcp.controlling_process(client, pid)
18
    
19
    loop_acceptor(socket)
20
end

listen/1listen/1 函数用来监听指定的端口,在成功之后开始接受传入 TCPTCP 连接;loop_acceptor/1loop_acceptor/1 函数用来循环接受 TCPTCP 连接,一旦接受一个连接,则为其创建一个 TcpConnectionTcpConnection 进程,并将该链接的 socketsocket 控制权转交给新生成的 TcpConnectionTcpConnection 进程。在 ElixirElixir 中,尾递归可以被优化,不会无限制占用栈空间,因此在函数的最末尾进行递归调用实现循环接受 TCPTCP 连接。

TcpConnection

这是本节最核心的功能模块,负责与客户端的一切通信。

TcpConnectionTcpConnection 同样是一个 GenServerGenServer,用于保存一些状态和传输 TCPTCP 数据。其初始化函数如下:

elixir
1
@impl true
2
def init(socket) do
3
    Logger.debug("New client connected.")
4
    {:ok, %{socket: socket, status: :waiting_auth}}
5
end
1
@impl true
2
def init(socket) do
3
    Logger.debug("New client connected.")
4
    {:ok, %{socket: socket, status: :waiting_auth}}
5
end
elixir
1
@impl true
2
def init(socket) do
3
    Logger.debug("New client connected.")
4
    {:ok, %{socket: socket, status: :waiting_auth}}
5
end
1
@impl true
2
def init(socket) do
3
    Logger.debug("New client connected.")
4
    {:ok, %{socket: socket, status: :waiting_auth}}
5
end

可以看到,我们把 socketsocket 信息存入了进程的 statestate 中,方便后续调取。此处的 statusstatus 属性暂且抛开不管,用于后续用户的鉴权,本节不作讨论。

接下来是本进程的核心函数 - TCPTCP 消息接收函数:

elixir
1
@impl true
2
def handle_info({:tcp, _socket, data}, %{socket: socket} = state) do
3
    result = "You've typed: #{data}"
4
    send_data(senddata, socket)
5
    
6
    {:noreply, state}
7
end
1
@impl true
2
def handle_info({:tcp, _socket, data}, %{socket: socket} = state) do
3
    result = "You've typed: #{data}"
4
    send_data(senddata, socket)
5
    
6
    {:noreply, state}
7
end
elixir
1
@impl true
2
def handle_info({:tcp, _socket, data}, %{socket: socket} = state) do
3
    result = "You've typed: #{data}"
4
    send_data(senddata, socket)
5
    
6
    {:noreply, state}
7
end
1
@impl true
2
def handle_info({:tcp, _socket, data}, %{socket: socket} = state) do
3
    result = "You've typed: #{data}"
4
    send_data(senddata, socket)
5
    
6
    {:noreply, state}
7
end

此处为了方便测试,先写一个简单的 echo 功能。GenServerGenServer 还贴心地提供了对 TCPTCP 连接状态变化的处理,只需实现以下两个函数:

elixir
1
@impl true
2
def handle_info({:tcp_closed, _conn}, state) do
3
    Logger.error("Socket #{inspect(state.socket, pretty: true)} closed unexpectly.")
4
    DynamicSupervisor.terminate_child(GateServer.TcpConnectionSup, self())
5
    
6
    {:stop, :normal, state}
7
end
8

9
@impl true
10
def handle_info({:tcp_error, _conn, err}, state) do
11
    Logger.error("Socket #{inspect(state.socket, pretty: true)} error: #{err}")
12
    DynamicSupervisor.terminate_child(GateServer.TcpConnectionSup, self())
13
    
14
    {:stop, :normal, state}
15
end
1
@impl true
2
def handle_info({:tcp_closed, _conn}, state) do
3
    Logger.error("Socket #{inspect(state.socket, pretty: true)} closed unexpectly.")
4
    DynamicSupervisor.terminate_child(GateServer.TcpConnectionSup, self())
5
    
6
    {:stop, :normal, state}
7
end
8

9
@impl true
10
def handle_info({:tcp_error, _conn, err}, state) do
11
    Logger.error("Socket #{inspect(state.socket, pretty: true)} error: #{err}")
12
    DynamicSupervisor.terminate_child(GateServer.TcpConnectionSup, self())
13
    
14
    {:stop, :normal, state}
15
end
elixir
1
@impl true
2
def handle_info({:tcp_closed, _conn}, state) do
3
    Logger.error("Socket #{inspect(state.socket, pretty: true)} closed unexpectly.")
4
    DynamicSupervisor.terminate_child(GateServer.TcpConnectionSup, self())
5
    
6
    {:stop, :normal, state}
7
end
8

9
@impl true
10
def handle_info({:tcp_error, _conn, err}, state) do
11
    Logger.error("Socket #{inspect(state.socket, pretty: true)} error: #{err}")
12
    DynamicSupervisor.terminate_child(GateServer.TcpConnectionSup, self())
13
    
14
    {:stop, :normal, state}
15
end
1
@impl true
2
def handle_info({:tcp_closed, _conn}, state) do
3
    Logger.error("Socket #{inspect(state.socket, pretty: true)} closed unexpectly.")
4
    DynamicSupervisor.terminate_child(GateServer.TcpConnectionSup, self())
5
    
6
    {:stop, :normal, state}
7
end
8

9
@impl true
10
def handle_info({:tcp_error, _conn, err}, state) do
11
    Logger.error("Socket #{inspect(state.socket, pretty: true)} error: #{err}")
12
    DynamicSupervisor.terminate_child(GateServer.TcpConnectionSup, self())
13
    
14
    {:stop, :normal, state}
15
end

这里我们就可以对 TCPTCP 的连接建立以及信息收发功能进行测试了。假设我的 gate_servergate_server 运行在本机 2900029000 端口上,我们运行 TelnetTelnet

powershell
1
telnet 127.0.0.1 29000
1
telnet 127.0.0.1 29000
powershell
1
telnet 127.0.0.1 29000
1
telnet 127.0.0.1 29000

即可开始与服务器进行通信。测试结果:

Figure 4: beacon_server state 内容

接下来的工作

gate_servergate_server 到这里暂告一段落,一个能够接受 TCPTCP 客户端连接并且进行高并发通信的服务器就基本完成了。下一步我们将继续实现消息协议相关内容,引入 ProtobufProtobuf 消息库并进行消息分发,进一步完善网关服务器功能。此部分功能等到下一步实现场景服务器 scene_serverscene_server 后再回来继续,敬请期待!