mina基礎及服務端接口測試
一、mina是什么
官方解釋:Apache的Mina(Multipurpose Infrastructure Networked Applications)是一個網絡應用框架,可以幫助用戶開發高性能和高擴展性的網絡應用程序;它提供了一個抽象的、事件驅動的異步API,使Java NIO在各種傳輸協議(如TCP/IP,UDP/IP協議等)下快速高效開發。
官網地址:http://mina.apache.org/
源碼分析:http://my.oschina.net/ielts0909/blog/90355/
二、mina的工作流程
從上圖可以看出,mina分為客戶端和服務端,客戶端建立連接,同時開啟一個IoProcessor線程,服務端監聽連接,客戶端和服務端的連接周期由IoSession管理,IoFilter用來過濾消息,而IoHandler則主要用于業務的處理。
三、主要的類
1、IoService
IoService是創建服務的頂層接口,無論客戶端還是服務端,都是從它繼承實現的。
以下是創建服務端的代碼
try { acceptor = new NioSocketAcceptor();\\創建一個服務端 acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()))); \\綁定一個解碼器 acceptor.getFilterChain().addLast("logger",new LoggingFilter()); \\綁定一個日志處理器 acceptor.getSessionConfig().setReadBufferSize(2048);\\設置讀緩沖區大小 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);\\設置讀寫空閑進入時間 acceptor.setHandler(new IoHandlerAdapter());\\設置業務處理器 acceptor.bind(new InetSocketAddress(port));\\綁定端口 logger.info("服務端啟動成功... 端口號為:" + port); } catch (Exception e) { logger.error("服務端啟動異常....", e); e.printStackTrace(); } |
不難看出,要建立一個服務端的代碼很簡單,這比Java NIO或單純用socket編程來的簡單很多。
2、IoProcessor
對于一個IoAcceptor或IoConnector線程對應一個IoProcessor線程用于IO的處理,這個IoProcessor線程從IoProcessor線程池中取出。IoProcessor線程池的大小默認為機器的CPU核數+1,例如雙核機器的IoProcessor的線程池大小默認為3,通過添加ExecutorFilter可以設置線程池,每個IoProcessor管理多個IoSession
acceptor.getFilterChain().addLast("threadpool",new ExecutorFilter(corePoolSize, maximumPoolSize, keepAliveTime,unit)
3、IoSession
IoSession是用來保持IoService的上下文,一個IoService在建立連接之后建立一個IoSession(一個連接一個session),IoSession的生命周期從Connection建立到斷開為止。
主要功能:
1、管理連接(session.getSessionConfig.XX)。這里的管理連接并不是直接去控制我們上次講的最底層的連接acceptor和connector。如果acceptor和connector建立的一條管道,那session就是在管道內的管理者,他是沒有辦法將管道對半拆分開的,他只能從內部阻斷兩邊的通信。管理連接還有部分就是可以配置緩沖區的大小,閑置時間等等。
2、存儲信息(session.setAttribute())。和web里的session一樣,這里的session也有存儲attribute的功能,不過一般來說,這里存儲的都是和連接有關的東西,并不會像web開發一樣存一些業務上的東西。
3、驅動讀寫操作。如session.write()。
4、統計功能。Session還記錄了連接中的byte、message等數量。
4、IoHandler
public interface IoHandler { void sessionCreated(IoSession session) throws Exception; void sessionOpened(IoSession session) throws Exception; void sessionClosed(IoSession session) throws Exception; void sessionIdle(IoSession session, IdleStatus status) throws Exception; void exceptionCaught(IoSession session, Throwable cause) throws Exception; void messageReceived(IoSession session, Object message) throws Exception; void messageSent(IoSession session, Object message) throws Exception; } |
一般情況下,我們最關心的只有messageReceived方法,接收消息并處理,然后調用IoSession的write方法發送出消息。一般情況下很少有人實現IoHandler接口,而是繼承它的一個實現類IoHandlerAdapter,這樣不用覆蓋它的7個方法,只需要根據具體需求覆蓋其中的幾個方法就可以。
5、IoFilter
Mina最主要的工作就是把底層傳輸的字節碼轉換為Java對象,提供給應用程序;或者把應用程序返回的結果轉換為字節碼,交給底層傳輸。這些都是由IoFilter完成的。
Filter,過濾器的意思。IoFilter,I/O操作的過濾器。IoFilter和Servlet中的過濾器一樣,主要用于攔截和過濾網絡傳輸中I/O操作的各種消息。
IoService實例會綁定一個DefaultIoFilterChainBuilder ---- 過濾器鏈,我們把自定義的各種過濾器(IoFilter)自由的插放在這個過濾器鏈上了。
Mina中自帶的解碼器:
CumulativeProtocolDecoder 累積性解碼器
SynchronizedProtocolDecoder 這個解碼器用于將任何一個解碼器包裝為一個線程安全的解碼器,用于解決每次執行decode()方法時可能線程不是上一次的線程的問題,但這樣會在高并發時,大大降低系統的性能。
TextLineDecoder 按照文本的換行符( Windows:\r\n 、Linux:\n、Mac:\r)解碼數據。
如何自定義編解碼?
1.繼承相關編解碼器類,重寫實現doDecode/doEncode方法
2.重寫ProtocolCodecFactory工廠類
三、如何測試mina的服務端
1、創建自己的mina客戶端
public IoConnector creatClient() { NioSocketConnector connector = null; try { connector = new NioSocketConnector(); connector.getSessionConfig().setReadBufferSize(1024 * 1024 * 5); connector.getSessionConfig().setBothIdleTime(10); connector.getSessionConfig().setKeepAlive(true); connector.setHandler(new MyIoHandler()); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new DecodeFactory())); connector.setConnectTimeoutMillis(5000); } catch (Exception e) { e.printStackTrace(); } return connector; } |
2、重寫Hanlder類,其中最重要的messageReciever方法需要先寫好如何處理收到的數據,包括斷言等
@Override public void sessionCreated(IoSession session) throws Exception { logger.info("服務端與客戶端創建連接..."); } @Override public void sessionOpened(IoSession session) throws Exception { logger.info("服務端與客戶端連接打開..."+ "當前第" + session.getId() + "個客戶端"); } @Override public void messageReceived(IoSession session, Object message) throws Exception { if (message instanceof IoBuffer) { ServerResponse.getResponseInfo(session,(IoBuffer)message); } } @Override public void messageSent(IoSession session, Object message) throws Exception { logger.info("服務端發送信息成功..."); } @Override public void sessionClosed(IoSession session) throws Exception { logger.info("服務端與客戶端連接斷開..."); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { logger.info("服務端進入空閑狀態..."); } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { logger.error("服務端發送異常...", cause); } |
3、針對服務端通信規則重寫編解碼方法,支持斷包粘包
public class ClientDecoder extends CumulativeProtocolDecoder { public static Logger logger = Logger.getLogger(ClientDecoder.class); @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { IoBuffer buf = IoBuffer.allocate(0).setAutoExpand(true); IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true); byte [] head = new byte[16]; logger.info(in.remaining()); if (in.remaining() > 0) { int msgLen = 16;// 16字節判斷消息長度 if (in.remaining() > msgLen) { in.mark(); in.get(head, 0, 16); buf.put(XorParamCodec.decryptXOR(head)); buf.flip(); int length=buf.getInt(); if (length - msgLen > in.remaining()) { in.reset(); return false; } else { in.position(0); while (in.hasRemaining()) { buffer.put(in.get()); } buffer.flip(); out.write(buffer); return false; } } } return false; } } |
4、編寫Test發送數據
@Before public void beforeTest() { ip = PropertiesHandle.readValue("ip"); port = Integer.valueOf(PropertiesHandle.readValue("port")); mmc = new MyMinaClient(); } @Test public void testUpdate() throws Exception { IoConnector connector = mmc.creatClient(); IoSession session = mmc.getIoSession(connector, ip, port); mmc.sendMsg(session, ClientRegister.getRegisterInfo()); \\ClientRegister和ClientUpdate類是客戶端發送的數據 mmc.sendMsg(session, ClientUpdate.getUpdateInfo()); mmc.close(session, connector); Assert.assertEquals(AcctGuardData.Registerr_Rec, HeaderInfo.response); } |