使用 erlang OTP 模式编写非阻塞的 tcp 服务器(来自erlang wiki)

来源:互联网 时间:1970-01-01

参考资料:http://erlangcentral.org/wiki/index.php/Building_a_Non-blocking_TCP_server_using_OTP_principles

服务器设计
tcp_server_app下的根监控树使用one_for_one重启策略。两个子树应用,第一个是一个tcp套接字监听服务器,使用gen_server模式来实现,采用异步监听的客户端连接的模式。第二个是一个客户端应用,使用gen_fsm模式实现,使用标准SASL错误报告接口,记录客户端消息处理的日志以及非正常与服务器断开连接日志。

整体应用架构:
                 +----------------+
                 | tcp_server_app |
                 +--------+-------+
                          | (one_for_one)
         +----------------+---------+
         |                                       |
 +-------+------+           +-------+--------+
 | tcp_listener |                + tcp_client_sup |
 +--------------+            +-------+--------+
                                                  | (simple_one_for_one)
                                         +-----|---------+
                                      +-------|--------+|
                                     +--------+-------+|+
                                      |  tcp_echo_fsm  |+
                                     +----------------+

tcp_server代码如下:

 1 %% TCP Server Application (tcp_server_app.erl)

2 -module(tcp_server_app).

3 -author('[email protected]').

4

5 %% 实现application模式

6 -behaviour(application).

7

8 -export([start_client/0]).

9

10 %% 应用程序启动以及监控树回调函数

11 -export([start/2, stop/1, init/1]).

12

13 %% 宏变量定义

14 -define(MAX_RESTART, 5).

15 -define(MAX_TIME, 60).

16 -define(DEF_PORT, 2222).

17

18 %% 启动客户端进程的接口

19 %% 在监听程序建立连接时调用

20 start_client() ->

21 %% 回调第二个init函数,因为第二个是动态添加监控树子节点

22 %% 也就是说这里是两颗不同的监控树,使用了一个模块两个 init 函数来实现

23 supervisor:start_child(tcp_client_sup, []).

24

25 %%----------------------------------------------------------------------

26 %% Application behaviour callbacks

27 %%----------------------------------------------------------------------

28 start(_Type, _Args) ->

29 %% 获取端口配置参数,找不到时返回默认端口 ?DEF_PORT

30 ListenPort = get_app_env(listen_port, ?DEF_PORT),

31

32 %% 启动应用程序,回调函数为 第一个 init 函数,根据参数匹配,参数为 [端口,客户端回调模块]

33 %% 第一个 init 函数仅仅是启动了两个监控树

34 supervisor:start_link({local, ?MODULE}, ?MODULE, [ListenPort, tcp_echo_fsm]).

35

36 stop(_S) ->

37 ok.

38

39 %%----------------------------------------------------------------------

40 %% Supervisor behaviour callbacks

41 %%----------------------------------------------------------------------

42 init([Port, Module]) ->

43 {ok,

44 %% 监控树策略参数,ono_for_one策略,设置MAX_TIME最多重启的MAX_RESTART次

45 {_SupFlags = {one_for_one, ?MAX_RESTART, ?MAX_TIME},

46 [

47 % TCP Listener

48 { tcp_server_sup, % Id = internal id

49 {tcp_listener,start_link,[Port,Module]}, % StartFun = {M, F, A}

50 permanent, % Restart = permanent | transient | temporary

51 2000, % Shutdown = brutal_kill | int() >= 0 | infinity

52 worker, % Type = worker | supervisor

53 [tcp_listener] % Modules = [Module] | dynamic

54 },

55 % Client instance supervisor

56 {

57 %% Module参数初始化了tcp_client_sup监控树的 init 函数, init 函数在下面

58 tcp_client_sup,

59 %% 子节点启动策略

60 {supervisor,start_link,[{local, tcp_client_sup}, ?MODULE, [Module]]},

61 permanent, % Restart = permanent | transient | temporary

62 infinity, % Shutdown = brutal_kill | int() >= 0 | infinity

63 supervisor, % Type = worker | supervisor

64 [] % Modules = [Module] | dynamic

65 }

66 ]

67 }

68 };

69

70 %% 在服务器接收连接时,创建客户端进程时会回调到这个函数,使用simple_one_for_one启动策略

71 %% 参数 Module 在第一个

72 init([Module]) ->

73 {ok,

74 %% 另外一种根监督树模式,simple_one_for_one策略子节点只能动态添加

75 {_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME},

76 [

77 % TCP Client

78 { undefined, % Id = internal id

79 {Module,start_link,[]}, % StartFun = {M, F, A}

80 temporary, % Restart = permanent | transient | temporary

81 2000, % Shutdown = brutal_kill | int() >= 0 | infinity

82 worker, % Type = worker | supervisor

83 [] % Modules = [Module] | dynamic

84 }

85 ]

86 }

87 }.

88

89 %%----------------------------------------------------------------------

90 %% Internal functions

91 %%----------------------------------------------------------------------

92 %% 获取配置文件xxx.app文件中的配置变量

93 get_app_env(Opt, Default) ->

94 case application:get_env(application:get_application(), Opt) of

95 {ok, Val} -> Val;

96 _ ->

97 case init:get_argument(Opt) of

98 [[Val | _]] -> Val;

99 error -> Default

100 end

101 end.

 

下面是服务端socket监听程序,这里使用了一个不具有官方文档的 api
 prim_inet:async_accept/2 来实现一个异步监听套接字的服务器程序,代码如下:

 1 % TCP Listener Process (tcp_listener.erl)

2 -module(tcp_listener).

3 -author('[email protected]').

4

5 %% 实现 gen_server 模式

6 -behaviour(gen_server).

7

8 %% 内部接口

9 -export([start_link/2]).

10

11 %% gen_server 回调函数

12 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,

13 code_change/3]).

14

15 %% 定义了一个 record 记录 gen_server 进程的状态

16 -record(state, {

17 listener, % Listening socket

18 acceptor, % Asynchronous acceptor's internal reference

19 module % FSM handling module

20 }).

21

22 %%--------------------------------------------------------------------

23 %% @spec (Port::integer(), Module) -> {ok, Pid} | {error, Reason}

24 %% @doc 监控树调用并开始进行tcp套接字监听

25 %% @end

26 %%----------------------------------------------------------------------

27 start_link(Port, Module) when is_integer(Port), is_atom(Module) ->

28 gen_server:start_link({local, ?MODULE}, ?MODULE, [Port, Module], []).

29

30 %%%------------------------------------------------------------------------

31 %%% Callback functions from gen_server

32 %%%------------------------------------------------------------------------

33

34 %%----------------------------------------------------------------------

35 %% @spec (Port::integer()) -> {ok, State} |

36 %% {ok, State, Timeout} |

37 %% ignore |

38 %% {stop, Reason}

39 %%

40 %% @doc gen_server启动时回调,并创建 tcp 监听

41 %% @end

42 %%----------------------------------------------------------------------

43 init([Port, Module]) ->

44 process_flag(trap_exit, true),

45 Opts = [binary, {packet, 2}, {reuseaddr, true},

46 {keepalive, true}, {backlog, 30}, {active, false}],

47 %% 使用 gen_tcp 模块启动套接字监听,这是一个阻塞动作

48 case gen_tcp:listen(Port, Opts) of

49 {ok, Listen_socket} -> %% 创建监听成功返回监听socket

50 %% 创建第一个接受连接的进程

51 %% prim_inet:async_accept/2开启异步监听

52 %% 之后有客户端连接时会向此进程发送一个异步消息inet_async到进程消息队列

53 %% Ref 存储接受进程的引用

54 {ok, Ref} = prim_inet:async_accept(Listen_socket, -1),

55 {ok, #state{listener = Listen_socket,

56 acceptor = Ref,

57 module = Module}};

58 {error, Reason} ->

59 {stop, Reason}

60 end.

61

62 %%-------------------------------------------------------------------------

63 %% @spec (Request, From, State) -> {reply, Reply, State} |

64 %% {reply, Reply, State, Timeout} |

65 %% {noreply, State} |

66 %% {noreply, State, Timeout} |

67 %% {stop, Reason, Reply, State} |

68 %% {stop, Reason, State}

69 %% @doc 服务进程被同步调用时的回调函数

70 %% @end

71 %% @private

72 %%-------------------------------------------------------------------------

73 handle_call(Request, _From, State) ->

74 {stop, {unknown_call, Request}, State}.

75

76 %%-------------------------------------------------------------------------

77 %% @spec (Msg, State) ->{noreply, State} |

78 %% {noreply, State, Timeout} |

79 %% {stop, Reason, State}

80 %% @doc 服务进程被异步调用时的回调函数

81 %% @end

82 %% @private

83 %%-------------------------------------------------------------------------

84 handle_cast(_Msg, State) ->

85 {noreply, State}.

86

87 %%-------------------------------------------------------------------------

88 %% @spec (Msg, State) ->{noreply, State} |

89 %% {noreply, State, Timeout} |

90 %% {stop, Reason, State}

91 %% @doc 回调函数,处理那些直接发消息到进程邮箱的事件

92 %% 这里处理的是 {inet_async, ListSock, Ref, {ok, CliSocket}}事件,

93 %% inet_async 表示是一个异步事件,服务器端接收连接采用异步的方式,

94 %% 客户端连接最终会被转化成一个 inet_async 消息发送到进程邮箱等待处理

95 %% {{ok, CliSocket}} 里的CliSocket表示客户端建立的连接套接口

96 %% @end

97 %% @private

98 %%-------------------------------------------------------------------------

99

100 %% 注意这里 ListSock 以及 Ref 做了匹配,只有匹配了才是该监听口接收的连接

101 handle_info({inet_async, ListSock, Ref, {ok, CliSocket}},

102 #state{listener=ListSock, acceptor=Ref, module=Module} = State) ->

103 try

104 case set_sockopt(ListSock, CliSocket) of

105 ok -> ok;

106 {error, Reason} -> exit({set_sockopt, Reason})

107 end,

108

109 %% 接收新的客户端连接,启动一个新的客户端状态机进程,动态添加到 tcp_client_sup 客户端监控树

110 {ok, Pid} = tcp_server_app:start_client(),

111

112 %% 绑定 CliSocet 到客户端进程 Pid, 这样CliSocket接收数据都会被转化成Pid代表进程的邮箱消息

113 gen_tcp:controlling_process(CliSocket, Pid),

114 %% Instruct the new FSM that it owns the socket.

115

116 Module:set_socket(Pid, CliSocket),

117

118 %% Signal the network driver that we are ready to accept another connection

119 %% 重新设置异步监听下一个客户端连接的消息,设置新的监听引用

120 %% 必须重新设置才能监听到 {inet_async,S,Ref,Status} 消息

121 case prim_inet:async_accept(ListSock, -1) of

122 {ok, NewRef} -> ok;

123 {error, NewRef} -> exit({async_accept, inet:format_error(NewRef)})

124 end,

125

126 %% 更新新的监听引用

127 {noreply, State#state{acceptor=NewRef}}

128 catch exit:Why ->

129 error_logger:error_msg("Error in async accept: ~p.\n", [Why]),

130 {stop, Why, State}

131 end;

132

133 %%客户端建立连接的容错处理

134 handle_info({inet_async, ListSock, Ref, Error}, #state{listener=ListSock, acceptor=Ref} = State) ->

135 error_logger:error_msg("Error in socket acceptor: ~p.\n", [Error]),

136 {stop, Error, State};

137

138 handle_info(_Info, State) ->

139 {noreply, State}.

140

141 %%-------------------------------------------------------------------------

142 %% @spec (Reason, State) -> any

143 %% @doc Callback executed on server shutdown. It is only invoked if

144 %% `process_flag(trap_exit, true)' is set by the server process.

145 %% The return value is ignored.

146 %% @end

147 %% @private

148 %%-------------------------------------------------------------------------

149 terminate(_Reason, State) ->

150 gen_tcp:close(State#state.listener),

151 ok.

152

153 %%-------------------------------------------------------------------------

154 %% @spec (OldVsn, State, Extra) -> {ok, NewState}

155 %% @doc Convert process state when code is changed.

156 %% @end

157 %% @private

158 %%-------------------------------------------------------------------------

159 code_change(_OldVsn, State, _Extra) ->

160 {ok, State}.

161

162 %%%------------------------------------------------------------------------

163 %%% Internal functions

164 %%%------------------------------------------------------------------------

165

166 %% 设置客户端socket的参数选项,只是简单的复制了监听服务器的配置选项

167 set_sockopt(ListSock, CliSocket) ->

168 true = inet_db:register_socket(CliSocket, inet_tcp),

169 case prim_inet:getopts(ListSock, [active, nodelay, keepalive, delay_send, priority, tos]) of

170 {ok, Opts} ->

171 case prim_inet:setopts(CliSocket, Opts) of

172 ok -> ok;

173 Error -> gen_tcp:close(CliSocket), Error

174 end;

175 Error ->

176 gen_tcp:close(CliSocket), Error

177 end.

下面是客户端处理输出的状态机:

 1 %% TCP Client Socket Handling FSM (tcp_echo_fsm.erl)

2 %% 客户端输出处理状态机,这里其实就是一个 echo_server 的客户端版本

3

4 -module(tcp_echo_fsm).

5 -author('[email protected]').

6

7 %% 实现 gen_fsm 模式,事实上状态机应用场景没有 gen_server 多

8 %% 不过能用的场景都比较特殊,比如游戏客户端,服务端战斗模块

9 -behaviour(gen_fsm).

10

11 -export([start_link/0, set_socket/2]).

12

13 %% gen_fsm 回调函数

14 -export([init/1, handle_event/3,

15 handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).

16

17 %% FSM States FSM 状态机的状态

18 -export([

19 'WAIT_FOR_SOCKET'/2, %% 等待socket

20 'WAIT_FOR_DATA'/2 %% 等待socket数据

21 ]).

22

23 -record(state, {

24 socket, % client socket

25 addr % client address

26 }).

27

28 -define(TIMEOUT, 120000).

29

30 %%%------------------------------------------------------------------------

31 %%% API

32 %%%------------------------------------------------------------------------

33

34 %%-------------------------------------------------------------------------

35 %% @spec (Socket) -> {ok,Pid} | ignore | {error,Error}

36 %% @doc To be called by the supervisor in order to start the server.

37 %% If init/1 fails with Reason, the function returns {error,Reason}.

38 %% If init/1 returns {stop,Reason} or ignore, the process is

39 %% terminated and the function returns {error,Reason} or ignore,

40 %% respectively.

41 %% @end

42 %%-------------------------------------------------------------------------

43 start_link() ->

44 gen_fsm:start_link(?MODULE, [], []).

45

46 set_socket(Pid, Socket) when is_pid(Pid), is_port(Socket) ->

47 gen_fsm:send_event(Pid, {socket_ready, Socket}).

48

49 %%%------------------------------------------------------------------------

50 %%% Callback functions from gen_server

51 %%%------------------------------------------------------------------------

52

53 %%-------------------------------------------------------------------------

54 %% Func: init/1

55 %% Returns: {ok, StateName, StateData} |

56 %% {ok, StateName, StateData, Timeout} |

57 %% ignore |

58 %% {stop, StopReason}

59 %% @private

60 %%-------------------------------------------------------------------------

61 init([]) ->

62 process_flag(trap_exit, true),

63

64 %% 状态机启动之后的初始化状态

65 {ok, 'WAIT_FOR_SOCKET', #state{}}.

66

67 %%-------------------------------------------------------------------------

68 %% Func: StateName/2

69 %% Returns: {next_state, NextStateName, NextStateData} |

70 %% {next_state, NextStateName, NextStateData, Timeout} |

71 %% {stop, Reason, NewStateData}

72 %% @private

73 %%-------------------------------------------------------------------------

74

75 %% 创建客户端之后 set_socket 函数发送消息之后在这里被处理了

76 %% 大致逻辑是:收到通知,客户端连接socket到手,可以设置套接字选项并开始接收数据

77 'WAIT_FOR_SOCKET'({socket_ready, Socket}, State) when is_port(Socket) ->

78 % Now we own the socket

79 inet:setopts(Socket, [{active, once}, {packet, 2}, binary]),

80 {ok, {IP, _Port}} = inet:peername(Socket),

81

82 %% 确定了socket之后,状态机的下一个状态就是等着接收数据了

83 {next_state, 'WAIT_FOR_DATA', State#state{socket=Socket, addr=IP}, ?TIMEOUT};

84 'WAIT_FOR_SOCKET'(Other, State) ->

85 error_logger:error_msg("State: 'WAIT_FOR_SOCKET'. Unexpected message: ~p\n", [Other]),

86 %% Allow to receive async messages

87 {next_state, 'WAIT_FOR_SOCKET', State}.

88

89 %% 显示来自客户端的事件

90 'WAIT_FOR_DATA'({data, Data}, #state{socket=S} = State) ->

91 ok = gen_tcp:send(S, Data),

92 {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT};

93

94 'WAIT_FOR_DATA'(timeout, State) ->

95 error_logger:error_msg("~p Client connection timeout - closing.\n", [self()]),

96 {stop, normal, State};

97

98 'WAIT_FOR_DATA'(Data, State) ->

99 io:format("~p Ignoring data: ~p\n", [self(), Data]),

100 {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT}.

101

102 %%-------------------------------------------------------------------------

103 %% Func: handle_event/3

104 %% Returns: {next_state, NextStateName, NextStateData} |

105 %% {next_state, NextStateName, NextStateData, Timeout} |

106 %% {stop, Reason, NewStateData}

107 %% @private

108 %%-------------------------------------------------------------------------

109 handle_event(Event, StateName, StateData) ->

110 {stop, {StateName, undefined_event, Event}, StateData}.

111

112 %%-------------------------------------------------------------------------

113 %% Func: handle_sync_event/4

114 %% Returns: {next_state, NextStateName, NextStateData} |

115 %% {next_state, NextStateName, NextStateData, Timeout} |

116 %% {reply, Reply, NextStateName, NextStateData} |

117 %% {reply, Reply, NextStateName, NextStateData, Timeout} |

118 %% {stop, Reason, NewStateData} |

119 %% {stop, Reason, Reply, NewStateData}

120 %% @private

121 %%-------------------------------------------------------------------------

122 handle_sync_event(Event, _From, StateName, StateData) ->

123 {stop, {StateName, undefined_event, Event}, StateData}.

124

125 %%-------------------------------------------------------------------------

126 %% Func: handle_info/3

127 %% Returns: {next_state, NextStateName, NextStateData} |

128 %% {next_state, NextStateName, NextStateData, Timeout} |

129 %% {stop, Reason, NewStateData}

130 %% @private

131 %%-------------------------------------------------------------------------

132 handle_info({tcp, Socket, Bin}, StateName, #state{socket=Socket} = StateData) ->

133 % Flow control: enable forwarding of next TCP message

134 inet:setopts(Socket, [{active, once}]),

135 ?MODULE:StateName({data, Bin}, StateData);

136

137 handle_info({tcp_closed, Socket}, _StateName,

138 #state{socket=Socket, addr=Addr} = StateData) ->

139 error_logger:info_msg("~p Client ~p disconnected.\n", [self(), Addr]),

140 {stop, normal, StateData};

141

142 handle_info(_Info, StateName, StateData) ->

143 {noreply, StateName, StateData}.

144

145 %%-------------------------------------------------------------------------

146 %% Func: terminate/3

147 %% Purpose: Shutdown the fsm

148 %% Returns: any

149 %% @private

150 %%-------------------------------------------------------------------------

151 terminate(_Reason, _StateName, #state{socket=Socket}) ->

152 (catch gen_tcp:close(Socket)),

153 ok.

154

155 %%-------------------------------------------------------------------------

156 %% Func: code_change/4

157 %% Purpose: Convert process state when code is changed

158 %% Returns: {ok, NewState, NewStateData}

159 %% @private

160 %%-------------------------------------------------------------------------

161 code_change(_OldVsn, StateName, StateData, _Extra) ->

162 {ok, StateName, StateData}.

最后是app文件了:

 1 %% tcp_server.app 文件

2

3 {application, tcp_server,

4 [

5 {description, "Demo TCP server"},

6 {vsn, "1.0"},

7 {id, "tcp_server"},

8 {modules, [tcp_listener, tcp_echo_fsm]},

9 {registered, [tcp_server_sup, tcp_listener]},

10 {applications, [kernel, stdlib]},

11 %%

12 %% mod: 指定应用启动初始化的模块

13 %%

14 {mod, {tcp_server_app, []}},

15 {env, []}

16 ]

17 }.

以上基本上都是个人查找资料过程的笔记,有理解错误的地方请评论指出,谢谢!


相关阅读:
Top