cowboy实现websocket

来源:互联网 时间:1970-01-01

使用cowboy实现websocket主要实现以下回调函数 下面的函数返回值要具体弄清楚原因参考 websocket具体协议  主要就是两个部分 握手和数据传输

-export([init/3]). 常见所有处理程序回调。建立WebSocket连接,这个函数必须返回 upgrade 的元组。

-export([websocket_init/3]). 初始socket状态,也可以用于注册的过程,启动一个定时器,等返回参考doc.然后和WebSocket握手。

-export([websocket_handle/3]).处理客户端发送的对应的消息。返回格式 参考cow对应doc

-export([websocket_info/3]). 处理erlang之间派发的消息

-export([websocket_terminate/3]). websocket 断开 用于扫尾工作处理

我在这里还是采用otp方式构建整个项目 主要代码 以及效果

-module(websocket_app).

-behaviour(application).

%% Application callbacks

-export([start/2, stop/1]).

%% ===================================================================

%% Application callbacks

%% ===================================================================

start(_StartType, _StartArgs) ->

%%启动存储pid的树据 可以采用 ets 表格处理 但是为了方便集群处理 我采用的mnesia

ok = websocket_store:init(),

%% 配置路由

Dispatch = cowboy_router:compile([

{'_', [

{"/", cowboy_static, {priv_file, websocket, "index.html"}},

{"/websocket", websocket_hander, []},

{"/static/[...]", cowboy_static, {priv_dir, websocket, "static"}}

]}

]),

{ok, _} = cowboy:start_http(http, 100, [{port, 8080}],

[{env, [{dispatch, Dispatch}]}]),

%%启动监督树

websocket_sup:start_link().

stop(_State) ->

ok.

-module(websocket_sup).

-behaviour(supervisor).

%% API

-export([start_link/0]).

%% Supervisor callbacks

-export([init/1]).

%% Helper macro for declaring children of supervisor

-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 2000, Type, [I]}).

%% ===================================================================

%% API functions

%% ===================================================================

start_link() ->

supervisor:start_link({local, ?MODULE}, ?MODULE, []).

%% ===================================================================

%% Supervisor callbacks

%% ===================================================================

init([]) ->

Children = [],

{ok, { {one_for_one, 5, 10}, Children} }.

View Code

-module(websocket_hander).

-behaviour(cowboy_websocket_handler).

%% ------------------------------------------------------------------

%% API Function Exports

%% ------------------------------------------------------------------

%% ------------------------------------------------------------------

%% gen_server Function Exports

%% ------------------------------------------------------------------

-export([init/3]).

-export([websocket_init/3]).

-export([websocket_handle/3]).

-export([websocket_info/3]).

-export([websocket_terminate/3]).

%% ------------------------------------------------------------------

%% API Function Definitions

%% ------------------------------------------------------------------

%% ------------------------------------------------------------------

%% gen_server Function Definitions

%% ------------------------------------------------------------------

%%websocket 握手

init({tcp, http}, _Req, _Opts) ->

%%插入数据

websocket_store:dirty_insert(self(),web),

{upgrade, protocol, cowboy_websocket}.

%%连接初始

websocket_init(_TransportName, Req, _Opts) ->

%%连接一秒后 发送消息

erlang:start_timer(1000, self(), <<"Hello!">>),

{ok, Req, undefined_state}.

%%处理客户端发送投递的消息

websocket_handle({text, Msg}, Req, State) ->

%% 把消息投递到其余的进程 这里可以 投递到 tcp socket 也可以投递的 websocket 这样就可以实现 服务支持多协议处理

lists:foreach(fun([Pid,SocketType]) ->

case SocketType of

tcp -> ok;

web ->

case Pid == self() of

true -> ok;

false -> Pid!{chat,Msg}

end

end

end,websocket_store:dirty_lookall()),

{reply, {text, << "That's what she said! ", Msg/binary >>}, Req, State};

websocket_handle(_Data, Req, State) ->

{ok, Req, State}.

%% 处理erlang 发送的消息 这里是接收 46 行发送的数据

websocket_info({chat,Msg},_Req,State)->

{reply,{text,Msg},_Req,State};

%% 处理 erlang:start_timer(1000, self(), <<"Hello!">>) 发送的消息

websocket_info({timeout, _Ref, Msg}, Req, State) ->

{reply, {text, Msg}, Req, State};

websocket_info(_Info, Req, State) ->

{ok, Req, State}.

%%断开socket

websocket_terminate(_Reason, _Req, _State) ->

websocket_store:dirty_delete(self()),

ok.

%% ------------------------------------------------------------------

%% Internal Function Definitions

%% ------------------------------------------------------------------

%%%-------------------------------------------------------------------

%%% @author thinkpad <>

%%% @copyright (C) 2014, thinkpad

%%% @doc

%%%

%%% @end

%%% Created : 27 Jun 2014 by thinkpad <>

%%%-------------------------------------------------------------------

-module(websocket_store).

-include_lib("stdlib/include/qlc.hrl" ).

%% API

-export([dirty_insert/2,dirty_lookall/0,dirty_lookall_record/0,dirty_delete/1]).

-export([init/0,insert/2,delete/1,lookup/1,lookall/0]).

-record(socket_to_pid, {pid,socket_type}).

-define(WAIT_FOR_TABLES, 10000).

%%%===================================================================

%%% API

%%%===================================================================

init()->

dynamic_db_init().

%--------------------------------------------------------------------

%% @doc Insert a key and pid.

%% @spec insert(Key, Pid) -> void()

%% @end

%%--------------------------------------------------------------------

insert(Pid,SocketType) when is_pid(Pid) ->

Fun = fun() -> mnesia:write(#socket_to_pid{pid = Pid, socket_type = SocketType}) end,

{atomic, _} = mnesia:transaction(Fun).

%--------------------------------------------------------------------

%% @doc dirty insert pid and socket

%% @spec dirty_insert(Pid socket)

%% @end

%%--------------------------------------------------------------------

dirty_insert(Pid,SocketType) when is_pid(Pid) ->

mnesia:dirty_write(#socket_to_pid{pid = Pid, socket_type = SocketType}).

%--------------------------------------------------------------------

%% @doc dirty_read data

%% @spec dirty_lookall()

%% @end

%%--------------------------------------------------------------------

dirty_lookall()->

mnesia:dirty_select(socket_to_pid,[{#socket_to_pid{pid='$1',socket_type = '$2'},[],['$$']}]).

dirty_delete(Pid)->

mnesia:dirty_delete(socket_to_pid,Pid).

%--------------------------------------------------------------------

%% @doc look all record info

%% @spec

%% @end

%%--------------------------------------------------------------------

dirty_lookall_record()->

mnesia:dirty_select(socket_to_pid,[{#socket_to_pid{pid='$1',socket_type = '$2'},[],['$_']}]).

%%--------------------------------------------------------------------

%% @doc Find a pid given a key.

%% @spec lookup(Key) -> {ok, Pid} | {error, not_found}

%% @end

%%--------------------------------------------------------------------

lookup(Pid) ->

do(qlc:q([{X#socket_to_pid.pid,X#socket_to_pid.socket_type} || X <- mnesia:table(socket_to_pid),X#socket_to_pid.pid==Pid])).

%%--------------------------------------------------------------------

%% @doc Find all list

%% @spec lookall() -> {List} | {error, not_found}

%% @end

%%--------------------------------------------------------------------

lookall() ->

do(qlc:q([[X#socket_to_pid.pid,X#socket_to_pid.socket_type] || X <- mnesia:table(socket_to_pid)])).

%%--------------------------------------------------------------------

%% @doc Delete an element by pid from the registrar.

%% @spec delete(Pid) -> void()

%% @end

%%--------------------------------------------------------------------

delete(Pid) ->

try

[#socket_to_pid{} = Record] = mnesia:dirty_read(socket_to_pid, Pid, #socket_to_pid.pid),

mnesia:dirty_delete_object(Record)

catch

_C:_E -> ok

end.

%%--------------------------------------------------------------------

%% @doc

%% @spec

%% @end

%%--------------------------------------------------------------------

%%%===================================================================

%%% Internal functions

%%%===================================================================

dynamic_db_init() ->

delete_schema(),

{atomic, ok} = mnesia:create_table(socket_to_pid, [{attributes, record_info(fields, socket_to_pid)}]),

ok.

%% deletes a local schema.

delete_schema() ->

mnesia:stop(),

mnesia:delete_schema([node()]),

mnesia:start().

do(Query) ->

F = fun() -> qlc:e(Query) end,

{atomic, Value} = mnesia:transaction(F),

Value.



相关阅读:
Top