MMO Server From Scratch(1) - Beacon Server
今天来实现服务器的第一个部件 - beacon_server。
功能解析
为了建立Elixir集群,需要所有 Beam 节点在启动之时就已经知道一个固定的节点用来连接,之后 Beam 会自动完成节点之间的链接,即默认的全连接全连接模式,所有节点两两之间均有连接。关于这一点我还没有深入思考过有没有必要进行调整,之后看情况再说🤪
因此,为了让服务器集群内的所有节点在启动时都能够连接一个固定节点从而组成集群,这个固定节点就是beacon_serverbeacon_server。
beacon_serverbeacon_server需要有什么功能呢?在经过一番简单思考后,至少需要具备以下几个功能:
- 接受其他节点的连接
- 接受其他节点的注册信息
- 相应其他节点的需求,返回需求节点的信息
这里有两个重要概念:资源(Resource)资源(Resource) 和 需求(Requirement)需求(Requirement)。资源资源指某个节点自身的内容类型,也就是在集群中所处的角色,比如网关服务器的资源就是网关(gate_server);需求需求指某个节点需要的其他节点,比如网关节点需要网关管理节点(gate_manager)来注册自己,数据服务节点需要数据联系节点(data_contact)来把数据库同步到自身。
当一个节点向beacon_serverbeacon_server节点注册时,我们希望它能够向beacon_serverbeacon_server提供自己的节点名称、资源、需求等数据,方便beacon_serverbeacon_server在收到别的节点注册时,能够把已经注册过的节点当做需求返回给别的节点。
数据结构
我用一个 GenServerGenServer 线程负责上面所说的所有工作,利用线程的 statestate 来保存来往节点信息。当前粗略想了想,姑且定义信息存储格式如下:
1
%{
2
nodes: %{
3
"node1@host": :online,
4
"node2@host": :offline
5
},
6
requirements: [
7
%{
8
module: Module.Interface,
9
name: [:requirement_name],
10
node: :"node@host"
11
}
12
],
13
resources: [
14
%{
15
module: Module.Interface,
16
name: :resoutce_name,
17
node: :"node@host"
18
}
19
]
20
}
1
%{
2
nodes: %{
3
"node1@host": :online,
4
"node2@host": :offline
5
},
6
requirements: [
7
%{
8
module: Module.Interface,
9
name: [:requirement_name],
10
node: :"node@host"
11
}
12
],
13
resources: [
14
%{
15
module: Module.Interface,
16
name: :resoutce_name,
17
node: :"node@host"
18
}
19
]
20
}
1
%{
2
nodes: %{
3
"node1@host": :online,
4
"node2@host": :offline
5
},
6
requirements: [
7
%{
8
module: Module.Interface,
9
name: [:requirement_name],
10
node: :"node@host"
11
}
12
],
13
resources: [
14
%{
15
module: Module.Interface,
16
name: :resoutce_name,
17
node: :"node@host"
18
}
19
]
20
}
1
%{
2
nodes: %{
3
"node1@host": :online,
4
"node2@host": :offline
5
},
6
requirements: [
7
%{
8
module: Module.Interface,
9
name: [:requirement_name],
10
node: :"node@host"
11
}
12
],
13
resources: [
14
%{
15
module: Module.Interface,
16
name: :resoutce_name,
17
node: :"node@host"
18
}
19
]
20
}
我用一个字典存储所有信息,分为 nodesnodes、requirementsrequirements以及resourcesresources三部分。
nodesnodes存储所有已经连接的节点和他们的状态,:online:online表示在线正常连接,:offline:offline表示节点断开连接;
requirementsrequirements存储每个节点注册时提供的需求信息。使用列表存储,列表中每个项代表一个节点。项使用字典,存储模块(module)、名称(name)、节点(node)信息。其中名称名称字段,因为有些节点可能会有不只一个需求需求,因此使用列表存储。模块模块字段是为了留着以备后用,目前没什么用……节点节点字段用于获取的节点使用该字段对目标节点发送消息,必不可少。
resourcesresources存储每个节点注册时提供的资源信息,字段与requirementsrequirements完全相同,有一个不同的地方是名称名称字段的数据类型不再是列表,而是原子,因为每个节点只可能属于唯一的一种资源,不可能属于两种以上,因此用一个单一的原子就可以代表了。
简要实现
建立项目
这是第一个实现,在实现之前,我们先建立一个umbrellaumbrella项目,用来存放之后的所有代码:
1
mix new cluster --umbrella
1
mix new cluster --umbrella
1
mix new cluster --umbrella
1
mix new cluster --umbrella
然后创建本节的beacon_serverbeacon_server项目:
1
cd apps/
2
mix new beacon_server --sup
1
cd apps/
2
mix new beacon_server --sup
1
cd apps/
2
mix new beacon_server --sup
1
cd apps/
2
mix new beacon_server --sup
--sup--sup用来生成监督树。
有了项目之后,我们需要建立一个GenServerGenServer,用来充当其他节点用来通信的接口,我们就把他叫做BeaconBeacon好了。
功能函数
根据前面的设想,我们需要下面这么几个函数:
- register(credentials, state) - 用于把注册来的节点信息记录在
statestate中,并将新的statestate返回。 - get_requirements(node, requirements, resources) - 用于向已注册的节点返回其需求。
下面贴上我粗略实现的代码,当然这不会是最终版本,未来还有优化的空间:
1
@spec register({node(), module(), atom(), [atom()]}, map()) :: {:ok, map()}
2
defp register(
3
{node, module, resource, requirement},
4
state = %{nodes: connected_nodes, resources: resources, requirements: requirements}
5
) do
6
Logger.debug("Register: #{node} | #{resource} | #{inspect(requirement)}")
7
8
{:ok,
9
%{
10
state
11
| nodes: add_node(node, connected_nodes),
12
resources: add_resource(node, module, resource, resources),
13
requirements:
14
if requirement != [] do
15
add_requirement(node, module, requirement, requirements)
16
else
17
requirements
18
end
19
}
20
}
21
end
22
23
@spec get_requirements(node(), list(map()), list(map())) :: list(map())
24
defp get_requirements(node, requirements, resources) do
25
req = find_requirements(node, requirements)
26
offer = find_resources(req, resources)
27
offer
28
end
1
@spec register({node(), module(), atom(), [atom()]}, map()) :: {:ok, map()}
2
defp register(
3
{node, module, resource, requirement},
4
state = %{nodes: connected_nodes, resources: resources, requirements: requirements}
5
) do
6
Logger.debug("Register: #{node} | #{resource} | #{inspect(requirement)}")
7
8
{:ok,
9
%{
10
state
11
| nodes: add_node(node, connected_nodes),
12
resources: add_resource(node, module, resource, resources),
13
requirements:
14
if requirement != [] do
15
add_requirement(node, module, requirement, requirements)
16
else
17
requirements
18
end
19
}
20
}
21
end
22
23
@spec get_requirements(node(), list(map()), list(map())) :: list(map())
24
defp get_requirements(node, requirements, resources) do
25
req = find_requirements(node, requirements)
26
offer = find_resources(req, resources)
27
offer
28
end
1
@spec register({node(), module(), atom(), [atom()]}, map()) :: {:ok, map()}
2
defp register(
3
{node, module, resource, requirement},
4
state = %{nodes: connected_nodes, resources: resources, requirements: requirements}
5
) do
6
Logger.debug("Register: #{node} | #{resource} | #{inspect(requirement)}")
7
8
{:ok,
9
%{
10
state
11
| nodes: add_node(node, connected_nodes),
12
resources: add_resource(node, module, resource, resources),
13
requirements:
14
if requirement != [] do
15
add_requirement(node, module, requirement, requirements)
16
else
17
requirements
18
end
19
}
20
}
21
end
22
23
@spec get_requirements(node(), list(map()), list(map())) :: list(map())
24
defp get_requirements(node, requirements, resources) do
25
req = find_requirements(node, requirements)
26
offer = find_resources(req, resources)
27
offer
28
end
1
@spec register({node(), module(), atom(), [atom()]}, map()) :: {:ok, map()}
2
defp register(
3
{node, module, resource, requirement},
4
state = %{nodes: connected_nodes, resources: resources, requirements: requirements}
5
) do
6
Logger.debug("Register: #{node} | #{resource} | #{inspect(requirement)}")
7
8
{:ok,
9
%{
10
state
11
| nodes: add_node(node, connected_nodes),
12
resources: add_resource(node, module, resource, resources),
13
requirements:
14
if requirement != [] do
15
add_requirement(node, module, requirement, requirements)
16
else
17
requirements
18
end
19
}
20
}
21
end
22
23
@spec get_requirements(node(), list(map()), list(map())) :: list(map())
24
defp get_requirements(node, requirements, resources) do
25
req = find_requirements(node, requirements)
26
offer = find_resources(req, resources)
27
offer
28
end
上面代码中用到的其他私有函数我就不贴了,总之就是利用线程 statestate 中的数据返回新的数据。
除了这两个必要的函数,我还想添加两个能够监控节点通断的函数。这两个函数通过 handle_infohandle_info 实现。首先需要在线程初始化的时候开启这项功能:
1
:net_kernel.monitor_nodes(true)
1
:net_kernel.monitor_nodes(true)
1
:net_kernel.monitor_nodes(true)
1
:net_kernel.monitor_nodes(true)
之后实现两个 callback:
1
# ========== Node monitoring ==========
2
3
@impl true
4
def handle_info({:nodeup, node}, state) do
5
Logger.debug("Node connected: #{node}")
6
7
{:noreply, state}
8
end
9
10
@impl true
11
def handle_info({:nodedown, node}, state = %{nodes: node_list}) do
12
Logger.critical("Node disconnected: #{node}")
13
14
{:noreply, %{state | nodes: %{node_list | node => :offline}}}
15
end
1
# ========== Node monitoring ==========
2
3
@impl true
4
def handle_info({:nodeup, node}, state) do
5
Logger.debug("Node connected: #{node}")
6
7
{:noreply, state}
8
end
9
10
@impl true
11
def handle_info({:nodedown, node}, state = %{nodes: node_list}) do
12
Logger.critical("Node disconnected: #{node}")
13
14
{:noreply, %{state | nodes: %{node_list | node => :offline}}}
15
end
1
# ========== Node monitoring ==========
2
3
@impl true
4
def handle_info({:nodeup, node}, state) do
5
Logger.debug("Node connected: #{node}")
6
7
{:noreply, state}
8
end
9
10
@impl true
11
def handle_info({:nodedown, node}, state = %{nodes: node_list}) do
12
Logger.critical("Node disconnected: #{node}")
13
14
{:noreply, %{state | nodes: %{node_list | node => :offline}}}
15
end
1
# ========== Node monitoring ==========
2
3
@impl true
4
def handle_info({:nodeup, node}, state) do
5
Logger.debug("Node connected: #{node}")
6
7
{:noreply, state}
8
end
9
10
@impl true
11
def handle_info({:nodedown, node}, state = %{nodes: node_list}) do
12
Logger.critical("Node disconnected: #{node}")
13
14
{:noreply, %{state | nodes: %{node_list | node => :offline}}}
15
end
不在 :nodeup:nodeup 回调中将节点状态修改为 :online:online 是因为节点在注册的时候,注册函数已经将节点的状态修改为 :online:online 了。
接口函数
有了功能之后,还需要提供对外接口,GenServerGenServer 已经提供了相关的回调函数供我们实现,在这里我使用 handle_call/3handle_call/3,因为注册流程需要是同步的,只有注册完成之后对应节点才能开始正常运行。
同样地,对外接口也是两个,分别是 :register:register 和 :get_requirements:get_requirements:
1
@impl true
2
# Register node with resource and requirement.
3
def handle_call(
4
{:register, credentials},
5
_from,
6
state
7
) do
8
Logger.info("New register from #{inspect(credentials, pretty: true)}.")
9
10
{:ok, new_state} = register(credentials, state)
11
12
Logger.info("Register #{inspect(credentials, pretty: true)} complete.", ansi_color: :green)
13
14
{:reply, :ok, new_state}
15
end
16
17
@impl true
18
# Reply to caller node with specified requirements
19
def handle_call(
20
{:get_requirements, node},
21
_from,
22
state = %{nodes: _, resources: resources, requirements: requirements}
23
) do
24
Logger.debug("Getting requirements for #{inspect(node)}")
25
26
offer = get_requirements(node, requirements, resources)
27
28
{:reply,
29
case length(offer) do
30
0 -> nil
31
_ ->
32
Logger.info("Requirements retrieved: #{inspect(offer, pretty: true)}", ansi_color: :green)
33
{:ok, offer}
34
end, state}
35
end
1
@impl true
2
# Register node with resource and requirement.
3
def handle_call(
4
{:register, credentials},
5
_from,
6
state
7
) do
8
Logger.info("New register from #{inspect(credentials, pretty: true)}.")
9
10
{:ok, new_state} = register(credentials, state)
11
12
Logger.info("Register #{inspect(credentials, pretty: true)} complete.", ansi_color: :green)
13
14
{:reply, :ok, new_state}
15
end
16
17
@impl true
18
# Reply to caller node with specified requirements
19
def handle_call(
20
{:get_requirements, node},
21
_from,
22
state = %{nodes: _, resources: resources, requirements: requirements}
23
) do
24
Logger.debug("Getting requirements for #{inspect(node)}")
25
26
offer = get_requirements(node, requirements, resources)
27
28
{:reply,
29
case length(offer) do
30
0 -> nil
31
_ ->
32
Logger.info("Requirements retrieved: #{inspect(offer, pretty: true)}", ansi_color: :green)
33
{:ok, offer}
34
end, state}
35
end
1
@impl true
2
# Register node with resource and requirement.
3
def handle_call(
4
{:register, credentials},
5
_from,
6
state
7
) do
8
Logger.info("New register from #{inspect(credentials, pretty: true)}.")
9
10
{:ok, new_state} = register(credentials, state)
11
12
Logger.info("Register #{inspect(credentials, pretty: true)} complete.", ansi_color: :green)
13
14
{:reply, :ok, new_state}
15
end
16
17
@impl true
18
# Reply to caller node with specified requirements
19
def handle_call(
20
{:get_requirements, node},
21
_from,
22
state = %{nodes: _, resources: resources, requirements: requirements}
23
) do
24
Logger.debug("Getting requirements for #{inspect(node)}")
25
26
offer = get_requirements(node, requirements, resources)
27
28
{:reply,
29
case length(offer) do
30
0 -> nil
31
_ ->
32
Logger.info("Requirements retrieved: #{inspect(offer, pretty: true)}", ansi_color: :green)
33
{:ok, offer}
34
end, state}
35
end
1
@impl true
2
# Register node with resource and requirement.
3
def handle_call(
4
{:register, credentials},
5
_from,
6
state
7
) do
8
Logger.info("New register from #{inspect(credentials, pretty: true)}.")
9
10
{:ok, new_state} = register(credentials, state)
11
12
Logger.info("Register #{inspect(credentials, pretty: true)} complete.", ansi_color: :green)
13
14
{:reply, :ok, new_state}
15
end
16
17
@impl true
18
# Reply to caller node with specified requirements
19
def handle_call(
20
{:get_requirements, node},
21
_from,
22
state = %{nodes: _, resources: resources, requirements: requirements}
23
) do
24
Logger.debug("Getting requirements for #{inspect(node)}")
25
26
offer = get_requirements(node, requirements, resources)
27
28
{:reply,
29
case length(offer) do
30
0 -> nil
31
_ ->
32
Logger.info("Requirements retrieved: #{inspect(offer, pretty: true)}", ansi_color: :green)
33
{:ok, offer}
34
end, state}
35
end
至此,BeaconBeacon 功能模块就基本完整了,最后我们需要把它加入到监督树里使其运行起来。在 application.exapplication.ex 中:
1
def start(_type, _args) do
2
children = [
3
# Starts a worker by calling: BeaconServer.Worker.start_link(arg)
4
{BeaconServer.Beacon, name: BeaconServer.Beacon}
5
]
6
7
# See https://hexdocs.pm/elixir/Supervisor.html
8
# for other strategies and supported options
9
opts = [strategy: :one_for_one, name: BeaconServer.Supervisor]
10
Supervisor.start_link(children, opts)
11
end
1
def start(_type, _args) do
2
children = [
3
# Starts a worker by calling: BeaconServer.Worker.start_link(arg)
4
{BeaconServer.Beacon, name: BeaconServer.Beacon}
5
]
6
7
# See https://hexdocs.pm/elixir/Supervisor.html
8
# for other strategies and supported options
9
opts = [strategy: :one_for_one, name: BeaconServer.Supervisor]
10
Supervisor.start_link(children, opts)
11
end
1
def start(_type, _args) do
2
children = [
3
# Starts a worker by calling: BeaconServer.Worker.start_link(arg)
4
{BeaconServer.Beacon, name: BeaconServer.Beacon}
5
]
6
7
# See https://hexdocs.pm/elixir/Supervisor.html
8
# for other strategies and supported options
9
opts = [strategy: :one_for_one, name: BeaconServer.Supervisor]
10
Supervisor.start_link(children, opts)
11
end
1
def start(_type, _args) do
2
children = [
3
# Starts a worker by calling: BeaconServer.Worker.start_link(arg)
4
{BeaconServer.Beacon, name: BeaconServer.Beacon}
5
]
6
7
# See https://hexdocs.pm/elixir/Supervisor.html
8
# for other strategies and supported options
9
opts = [strategy: :one_for_one, name: BeaconServer.Supervisor]
10
Supervisor.start_link(children, opts)
11
end
像这样把 BeaconBeacon 模块加入到监督者的子线程列表中,beacon_serverbeacon_server 暂时就算完成了。
效果测试
运行一下试试:
1
iex --name beacon1@127.0.0.1 --cookie mmo -S mix
1
iex --name beacon1@127.0.0.1 --cookie mmo -S mix
1
iex --name beacon1@127.0.0.1 --cookie mmo -S mix
1
iex --name beacon1@127.0.0.1 --cookie mmo -S mix
为了让其他节点连接,namename 和 cookiecookie 一定好设置好。
我写了点测试代码调用一下试试:
最后我们看一下 BeaconBeacon 模块的 statestate 长什么样:
就先这样,后面我们会在此基础上继续实现别的服务器。