mina实践

来源:互联网 时间:1970-01-01

1 单线程模型,echo过程。

客户端

当req大小超过服务端的receive buffer时就会抛出buffer不足的异常。

客户端执行过程

AbstractPollingIoConnector.connect0(SocketAddress, SocketAddress, IoSessionInitializer<? extends ConnectFuture>)-》

NioSocketConnector.connect(SocketChannel, SocketAddress)-》

使用线程池ThreadPoolExecutor执行AbstractPollingIoConnector.Connector<T, H>

-》在connector中执行就绪选择NioSocketConnector.select(int)-》

NioSocketConnector.selectedHandles() 从中拿到select key进而得到关联的socket channel-》

拿到socket channel 后再放到这个方法里处理AbstractPollingIoConnector.processConnections(Iterator<H>)-》将socket channel封装成Niosession ,并关联一个IoProcessor(SimpleIoProcessorPool), NioSocketConnector.newSession(IoProcessor<NioSession>, SocketChannel) -》IoProcessor.add(IoSession session)添加至 SimpleIoProcessorPool中-》SimpleIoProcessorPool.getProcessor(T session)-》AbstractPollingIoProcessor.add(T session)

-》AbstractPollingIoProcessor.startupProcessor()-》使用线程池ThreadPoolExecutor启动AbstractPollingIoProcessor$Processor,这个就是反应器-》AbstractPollingIoProcessor.select(long timeout)反应器采用轮询的方式进行就绪选择,选择器注册了读事件-》如果有读事件的话,那么执行 AbstractPollingIoProcessor.process(T session)。

IoConnector connector = new NioSocketConnector();StringBuffer req = new StringBuffer();int length = 2048 / 26;for (int i = 0; i < length; i++) {// 26 bytereq.append("hello server-");}System.out.println(req.length() * 2 + "bytes");connector.setConnectTimeoutMillis(30000);connector.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())));connector.setHandler(new ClientHandler(req.toString()));connector.connect(new InetSocketAddress("localhost", 9123));while (true) {Thread.sleep(10000);}

public class ClientHandler extends IoHandlerAdapter {private final static Logger log = LoggerFactory.getLogger(ClientHandler.class);private final String values;public ClientHandler(String values) {this.values = values;}@Overridepublic void messageReceived(IoSession session, Object message) throws Exception {String str = message.toString();log.info("The response message received length is [" + str.length() * 2 + "bytes ]");log.info("The response message received is [" + str + "]");}@Overridepublic void sessionOpened(IoSession session) {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "sessionOpened");session.write(values);}@Overridepublic void sessionCreated(IoSession session) throws Exception {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "sessionCreated");}@Overridepublic void sessionClosed(IoSession session) throws Exception {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "sessionClosed");}@Overridepublic void sessionIdle(IoSession session, IdleStatus status) throws Exception {System.out.println("IDLE " + session.getIdleCount(status));}@Overridepublic void messageSent(IoSession session, Object message) throws Exception {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "messageSent");}}

public class ClientHandler extends IoHandlerAdapter {private final static Logger log = LoggerFactory.getLogger(ClientHandler.class);private final String values;private final int totalNum;private AtomicInteger finichNum = new AtomicInteger();private long start;public ClientHandler(String values, int totalNum) {this.values = values;this.totalNum = totalNum;}@Overridepublic void messageReceived(IoSession session, Object message) throws Exception {String str = message.toString();log.info("The response message received length is [" + str.length() * 2 + "bytes ]");log.info("The response message received is [" + str + "]");if (finichNum.addAndGet(1) == totalNum) {System.out.println(System.currentTimeMillis() - start);}}@Overridepublic void sessionOpened(IoSession session) {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "sessionOpened");session.write(values);if (finichNum.get() == 0) {start = System.currentTimeMillis();}}@Overridepublic void sessionCreated(IoSession session) throws Exception {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "sessionCreated");}@Overridepublic void sessionClosed(IoSession session) throws Exception {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "sessionClosed");}@Overridepublic void sessionIdle(IoSession session, IdleStatus status) throws Exception {System.out.println("IDLE " + session.getIdleCount(status));}@Overridepublic void messageSent(IoSession session, Object message) throws Exception {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "messageSent");}}

服务端

服务端执行过程:

AbstractIoAcceptor.bind(SocketAddress... addresses)-》AbstractPollingIoAcceptor.bindInternal(List<? extends SocketAddress> localAddresses)-》启动AbstractPollingIoAcceptor.startupAcceptor()-》使用线程池启动AbstractIoService.executeWorker(Runnable worker) ,接收器AbstractPollingIoAcceptor.Acceptor-》接收器轮询对接收事件进行就绪选择-》如果有连接进来,从NioSocketAcceptor.selectedHandles()拿到select key中拿到ServerSocketChannel-》执行AbstractPollingIoAcceptor.accept(IoProcessor<T> processor, H handle)-》ServerSocketChannel.accept()在从中拿到SocketChannel-》通过NioSocketSession构造iosession-》SimpleIoProcessorPool.add(T session)处理器池中拿到NioProcessor-》AbstractPollingIoProcessor.add(T session)(并在org.apache.mina.transport.socket.nio.NioProcessor.init(NioSession)注册读就绪事件)-》启动反应器AbstractPollingIoProcessor.startupProcessor()-》轮询进行读就绪选择NioProcessor.select(long timeout)-》触发读就绪事件AbstractPollingIoProcessor.process(T session)-》AbstractPollingIoProcessor.read(T session)-》IoFilterChain.fireMessageReceived(Object message) 进入过滤器流程,过滤器在构造接收器的时候就已经拼接好了(服务端先产生一个单例acceptor,触发读写事件后,acceptor再从SimpleIoProcessorPool(池里的AbstractPollingIoProcessor个数一般为cpu核数+1,如果业务处理无阻塞,那么这是最优的线程数)池里拿到一个AbstractPollingIoProcessor并创建一个与之关联的单例AbstractPollingIoProcessor.Processor

,在没有ExecutorFilter的情况下,反应器Processor和业务处理处于同一线程,属于单线程模型)。-》服务器接收到请求,那么在MyIoHandler.messageReceived(IoSession session, Object message)中读取,并发出响应IoSession.write(Object message)-》直接进入过滤器链IoFilterChain.fireFilterWrite(WriteRequest writeRequest)-》IoFilter.filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest)-》进入编码解码器ProtocolCodecFilter.filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest)-》最后一个环节,进行真正写操作DefaultIoFilterChain.HeadFilter.filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest)-》放入写队列WriteRequestQueue.offer(IoSession session, WriteRequest writeRequest)-》iosession又拿起他的处理器池SimpleIoProcessorPool.getProcessor(T session)-》得到处理器执行AbstractPollingIoProcessor.flush(T session),flush并没有真正执行网络写操作,仅仅是唤醒就绪选择器Selector.wakeup()-》AbstractPollingIoProcessor.flush(long currentTime)-》AbstractPollingIoProcessor.writeBuffer(T session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)-》NioProcessor.write(NioSession session, IoBuffer buf, int length)最后调用WritableByteChannel.write(ByteBuffer src)进行真正网络io操作(就绪选择器的写就绪是在write完之后触发的,也就是说我们要触发某个网络写操作,并不是依靠就绪选择器的write事件,而要自己创造条件,例如在反应器里面,通过判断写事件队列来决定是否进行网络写,就绪选择器的write事件是我们主动write产生的结果,可以参考mina,zookeeper的网络通信实现)。

IoAcceptor acceptor = new NioSocketAcceptor();acceptor.getSessionConfig().setReadBufferSize(2048);acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())));acceptor.setHandler(new MyIoHandler());acceptor.bind(new InetSocketAddress(9123));while (true) {Thread.sleep(10000);}

public class MyIoHandler extends IoHandlerAdapter {private final static Logger log = LoggerFactory.getLogger(MyIoHandler.class);@Overridepublic void exceptionCaught(IoSession session, Throwable cause) throws Exception {cause.printStackTrace();}@Overridepublic void messageReceived(IoSession session, Object message) throws Exception {String str = message.toString();log.info("The request message received length is [" + str.length() * 2 + "bytes ]");log.info("The request message received is [" + str + "]");if (str.endsWith("quit")) {session.close(true);return;}session.write(message);}@Overridepublic void sessionCreated(IoSession session) throws Exception {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "sessionCreated");}@Overridepublic void sessionClosed(IoSession session) throws Exception {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "sessionClosed");}@Overridepublic void sessionOpened(IoSession session) throws Exception {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "sessionOpened");}@Overridepublic void sessionIdle(IoSession session, IdleStatus status) throws Exception {// System.out.println("IDLE " + session.getIdleCount(status));}@Overridepublic void messageSent(IoSession session, Object message) throws Exception {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "messageSent");}} 

public class MyIoHandler extends IoHandlerAdapter {private final static Logger log = LoggerFactory.getLogger(MyIoHandler.class);@Overridepublic void exceptionCaught(IoSession session, Throwable cause) throws Exception {cause.printStackTrace();}@Overridepublic void messageReceived(IoSession session, Object message) throws Exception {String str = message.toString();log.info("The request message received length is [" + str.length() * 2 + "bytes ]");log.info("The request message received is [" + str + "]");if (str.endsWith("quit")) {session.close(true);return;}session.write(message);}@Overridepublic void sessionCreated(IoSession session) throws Exception {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "sessionCreated");}@Overridepublic void sessionClosed(IoSession session) throws Exception {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "sessionClosed");}@Overridepublic void sessionOpened(IoSession session) throws Exception {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "sessionOpened");}@Overridepublic void sessionIdle(IoSession session, IdleStatus status) throws Exception {// System.out.println("IDLE " + session.getIdleCount(status));}@Overridepublic void messageSent(IoSession session, Object message) throws Exception {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "messageSent");}}

服务端网络io状态变迁:接收器accept事件就绪-》sessionCreated-》sessionOpened-》flushingSessions为空-》AbstractPollingIoProcessor.Processor读事件就绪-》messageReceived-》IoSession.write(Object message)-》flushingSessions.add(session),flushingSessions remove,session.getChannel().write(buf.buf()),客户端触发messageReceived-》 AbstractPollingIoProcessor.Processor写事件就绪触发-》flushingSessions.add(session),flushingSessions remove ,AbstractPollingIoProcessor.writeBuffer(T session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime),fireMessageSent—》消息发送成功后messageSent。

flushingSessions充当写操作的一个同步器,决定何时执行写相关操作,并且当if (!buf.hasRemaining() ||

!hasFragmentation && localWrittenBytes != 0) {

// Buffer has been sent, clear the current request.

才会触发fireMessageSent。

客户端与之类似,唯一的区别在于一个connector,一个acceptor,用于前期的tcp连接建立;AbstractPollingIoProcessor.Processor用于连接成功后的数据读写操作。

2 多线程模型

客户端

每次connect成功都会产生一个连接session.

ExecutorService threadPool = Executors.newFixedThreadPool(3);for (int i = 0; i < threadNum; i++) {threadPool.execute(new ClientConnetion(connector));}

class ClientConnetion implements Runnable {private IoConnector connector;public ClientConnetion(IoConnector connector) {this.connector = connector;}@Overridepublic void run() {connector.connect(new InetSocketAddress("localhost", PORT));}}

服务端

如果业务线程无阻塞,那么默认的配置n(cpu个数)+1个AbstractPollingIoProcessor.Processor 线程数量为最优,在同一个线程里处理io和业务。

当业务线程阻塞,那么就应该采用多线程模型。

单机环境下,客户并发10个请求,服务端服务计算时间为100ms,阻塞时间为1000ms,处理完100个请求总时间为48797ms。

在理想条件下,最优线程数可以通过公式(阻塞时间/计算时间+1)*cpu核数得出。上述情况比较合适的服务线程数为20个。在同样的环境下,服务端采用了多线程模型,处理完100个请求的总时间为15593 ms。

(其他数据,5个线程,处理时间为22984ms;7个,22266ms;10个,17906ms;15个,16984;20个,15593;22个,16906;25个,15969;30个,17782ms;35个,17891。因为是非理想环境,所以测试数据有所偏差,仅供参考)

如下

IoAcceptor acceptor = new NioSocketAcceptor();acceptor.getSessionConfig().setReadBufferSize(2048);acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);acceptor.getFilterChain().addLast("executor1", new ExecutorFilter(20, 20);acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())));acceptor.setHandler(new MyIoHandler());acceptor.bind(new InetSocketAddress(9123));while (true) {Thread.sleep(10000);}

业务处理器

public class MyIoHandler extends IoHandlerAdapter {private final static Logger log = LoggerFactory.getLogger(MyIoHandler.class);@Overridepublic void exceptionCaught(IoSession session, Throwable cause) throws Exception {cause.printStackTrace();}@Overridepublic void messageReceived(IoSession session, Object message) throws Exception {String str = message.toString();log.info("The request message received length is [" + str.length() * 2 + "bytes ]");log.info("The request message received is [" + str + "]");if (str.endsWith("quit")) {session.close(true);return;}cptTime(100);blkTime(1000);session.write(message);}private void cptTime(long time) {long num = 1000000 * time;int temp = 0;// 100 ms// long start = System.currentTimeMillis();for (int i = 0; i < num; i++) {temp++;}// System.out.println(System.currentTimeMillis() - start);}private void blkTime(long time) {try {Thread.sleep(time);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}@Overridepublic void sessionCreated(IoSession session) throws Exception {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "sessionCreated");}@Overridepublic void sessionClosed(IoSession session) throws Exception {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "sessionClosed");}@Overridepublic void sessionOpened(IoSession session) throws Exception {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "sessionOpened");}@Overridepublic void sessionIdle(IoSession session, IdleStatus status) throws Exception {// System.out.println("IDLE " + session.getIdleCount(status));}@Overridepublic void messageSent(IoSession session, Object message) throws Exception {log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "messageSent");}}

使用acceptor.getFilterChain().addLast("executor1", new ExecutorFilter(22, 22))就可以实现SEDA模型。把不同阶段的业务处理过程都抽象为一个stage,每个stage都有一个包含最优线程数量的线程池,每个stage之间通过一个事件队列串接起来,ExecutorFilter就是实现的关键环节。



相关阅读:
Top