paulwong

          基于Apache Mina實(shí)現(xiàn)的TCP長(zhǎng)連接和短連接實(shí)例

          1、前言

          Apache MINA是Apache組織的一個(gè)優(yōu)秀的項(xiàng)目。MINA是Multipurpose Infrastructure for NetworkApplications的縮寫。它是一個(gè)網(wǎng)絡(luò)應(yīng)用程序框架,用來(lái)幫助用戶非常方便地開發(fā)高性能和高可靠性的網(wǎng)絡(luò)應(yīng)用程序。在本文中介紹了 如何通過(guò)Apache Mina2.0來(lái)實(shí)現(xiàn)TCP協(xié)議長(zhǎng)連接和短連接應(yīng)用。

          2、系統(tǒng)介紹

          2.1系統(tǒng)框架

          整個(gè)系統(tǒng)由兩個(gè)服務(wù)端程序和兩個(gè)客戶端程序組成。分別實(shí)現(xiàn)TCP長(zhǎng)連接和短連接通信。

          系統(tǒng)業(yè)務(wù)邏輯是一個(gè)客戶端與服務(wù)端建立長(zhǎng)連接,一個(gè)客戶端與服務(wù)端建立短連接。數(shù)據(jù)從短連接客戶端經(jīng)過(guò)服務(wù)端發(fā)送到長(zhǎng)連接客戶端,并從長(zhǎng)連接客戶端接收響應(yīng)數(shù)據(jù)。當(dāng)收到響應(yīng)數(shù)據(jù)后斷開連接。

          系統(tǒng)架構(gòu)圖如下:


          2.2處理流程

          系統(tǒng)處理流程如下:

          1) 啟動(dòng)服務(wù)端程序,監(jiān)聽8001和8002端口。

          2) 長(zhǎng)連接客戶端向服務(wù)端8002端口建立連接,服務(wù)端將連接對(duì)象保存到共享內(nèi)存中。由于采用長(zhǎng)連接方式,連接對(duì)象是唯一的。

          3) 短連接客戶端向服務(wù)端8001端口建立連接。建立連接后創(chuàng)建一個(gè)連接對(duì)象。

          4) 短連接客戶端連接成功后發(fā)送數(shù)據(jù)。服務(wù)端接收到數(shù)據(jù)后從共享內(nèi)存中得到長(zhǎng)連接方式的連接對(duì)象,使用此對(duì)象向長(zhǎng)連接客戶端發(fā)送數(shù)據(jù)。發(fā)送前將短連接對(duì)象設(shè)為長(zhǎng)連接對(duì)象的屬性值。

          5) 長(zhǎng)連接客戶端接收到數(shù)據(jù)后返回響應(yīng)數(shù)據(jù)。服務(wù)端從長(zhǎng)連接對(duì)象的屬性中取得短連接對(duì)象,通過(guò)此對(duì)象將響應(yīng)數(shù)據(jù)發(fā)送給短連接客戶端。

          6) 短連接客戶端收到響應(yīng)數(shù)據(jù)后,關(guān)閉連接。

          3、服務(wù)端程序

          3.1長(zhǎng)連接服務(wù)端

          服務(wù)啟動(dòng)

          public class MinaLongConnServer {

          private static final int PORT = 8002;



          public void start()throws IOException{

          IoAcceptor acceptor = new NioSocketAcceptor();



          acceptor.getFilterChain().addLast("logger", new LoggingFilter());

          acceptor.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));



          acceptor.setHandler(new MinaLongConnServerHandler());

          acceptor.getSessionConfig().setReadBufferSize(2048);

          acceptor.bind(new InetSocketAddress(PORT));

          System.out.println("Listeningon port " + PORT);

          }

          }

          //消息處理

          public class MinaLongConnServerHandler extends IoHandlerAdapter {

          private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());



          @Override

          public void sessionOpened(IoSession session) {

          InetSocketAddress remoteAddress = (InetSocketAddress)session.getRemoteAddress();

          String clientIp = remoteAddress.getAddress().getHostAddress();

          logger.info("LongConnect Server opened Session ID ="+String.valueOf(session.getId()));

          logger.info("接收來(lái)自客戶端 :" + clientIp + "的連接.");

          Initialization init = Initialization.getInstance();

          HashMap<String, IoSession> clientMap =init.getClientMap();

          clientMap.put(clientIp, session);

          }



          @Override

          public void messageReceived(IoSession session, Object message) {

          logger.info("Messagereceived in the long connect server..");

          String expression = message.toString();

          logger.info("Message is:" + expression);

          IoSession shortConnSession =(IoSession) session.getAttribute("shortConnSession");

          logger.info("ShortConnect Server Session ID ="+String.valueOf(shortConnSession.getId()));

          shortConnSession.write(expression);

          }



          @Override

          public void sessionIdle(IoSession session, IdleStatus status) {

          logger.info("Disconnectingthe idle.");

          // disconnect an idle client

          session.close(true);

          }



          @Override

          public void exceptionCaught(IoSession session, Throwable cause) {

          // close the connection onexceptional situation

          logger.warn(cause.getMessage(), cause);

          session.close(true);

          }

          }

          3.2短連接服務(wù)端

          服務(wù)啟動(dòng)

          public class MinaShortConnServer {

          private static final int PORT = 8001;



          public void start()throws IOException{

          IoAcceptor acceptor = new NioSocketAcceptor();



          acceptor.getFilterChain().addLast("logger", new LoggingFilter());

          acceptor.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));



          acceptor.setHandler(new MinaShortConnServerHandler());

          acceptor.getSessionConfig().setReadBufferSize(2048);

          acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 3);

          acceptor.bind(new InetSocketAddress(PORT));

          System.out.println("Listeningon port " + PORT);

          }

          }

          消息處理

          public class MinaShortConnServerHandler extends IoHandlerAdapter {

          private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());



          @Override

          public void sessionOpened(IoSession session) {

          InetSocketAddress remoteAddress = (InetSocketAddress)session.getRemoteAddress();

          logger.info(remoteAddress.getAddress().getHostAddress());

          logger.info(String.valueOf(session.getId()));

          }



          @Override

          public void messageReceived(IoSession session, Object message) {

          logger.info("Messagereceived in the short connect server");

          String expression = message.toString();

          Initialization init = Initialization.getInstance();

          HashMap<String, IoSession> clientMap =init.getClientMap();

          if (clientMap == null || clientMap.size() == 0) {

          session.write("error");

          else {

          IoSession longConnSession = null;

          Iterator<String> iterator =clientMap.keySet().iterator();

          String key = "";

          while (iterator.hasNext()) {

          key = iterator.next();

          longConnSession = clientMap.get(key);

          }

          logger.info("ShortConnect Server Session ID :"+String.valueOf(session.getId()));

          logger.info("LongConnect Server Session ID :"+String.valueOf(longConnSession.getId()));

          longConnSession.setAttribute("shortConnSession",session);

          longConnSession.write(expression);

          }

          }



          @Override

          public void sessionIdle(IoSession session, IdleStatus status) {

          logger.info("Disconnectingthe idle.");

          // disconnect an idle client

          session.close(true);

          }



          @Override

          public void exceptionCaught(IoSession session, Throwable cause) {

          // close the connection onexceptional situation

          logger.warn(cause.getMessage(), cause);

          session.close(true);

          }

          }



          4、客戶端程序

          4.1長(zhǎng)連接客戶端

          使用java.net.Socket來(lái)實(shí)現(xiàn)向服務(wù)端建立連接。Socket建立后一直保持連接,從服務(wù)端接收到數(shù)據(jù)包后直接將原文返回。

          public class TcpKeepAliveClient {

          private String ip;

          private int port;

          private static Socket socket = null;

          private static int timeout = 50 * 1000;



          public TcpKeepAliveClient(String ip, int port) {

          this.ip = ip;

          this.port = port;

          }



          public void receiveAndSend() throws IOException {

          InputStream input = null;

          OutputStream output = null;



          try {

          if (socket == null || socket.isClosed() || !socket.isConnected()) {

          socket = new Socket();

          InetSocketAddress addr = new InetSocketAddress(ip, port);

          socket.connect(addr, timeout);

          socket.setSoTimeout(timeout);

          System.out.println("TcpKeepAliveClientnew ");

          }



          input = socket.getInputStream();

          output = socket.getOutputStream();



          // read body

          byte[] receiveBytes = {};// 收到的包字節(jié)數(shù)組

          while (true) {

          if (input.available() > 0) {

          receiveBytes = new byte[input.available()];

          input.read(receiveBytes);



          // send

          System.out.println("TcpKeepAliveClientsend date :" + new String(receiveBytes));

          output.write(receiveBytes, 0, receiveBytes.length);

          output.flush();

          }

          }



          catch (Exception e) {

          e.printStackTrace();

          System.out.println("TcpClientnew socket error");

          }

          }



          public static void main(String[] args) throws Exception {

          TcpKeepAliveClient client = new TcpKeepAliveClient("127.0.0.1", 8002);

          client.receiveAndSend();

          }



          }

          4.2短連接客戶端

          服務(wù)啟動(dòng)

          public class MinaShortClient {

          private static final int PORT = 8001;



          public static void main(String[] args) throws IOException,InterruptedException {

          IoConnector connector = new NioSocketConnector();

          connector.getSessionConfig().setReadBufferSize(2048);



          connector.getFilterChain().addLast("logger", new LoggingFilter());

          connector.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));



          connector.setHandler(new MinaShortClientHandler());

          for (int i = 1; i <= 10; i++) {

          ConnectFuture future = connector.connect(new InetSocketAddress("127.0.0.1", PORT));

          future.awaitUninterruptibly();

          IoSession session =future.getSession();

          session.write(i);

          session.getCloseFuture().awaitUninterruptibly();



          System.out.println("result=" + session.getAttribute("result"));

          }

          connector.dispose();



          }

          }

          消息處理

          public class MinaShortClientHandler extends IoHandlerAdapter{

          private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());



          public MinaShortClientHandler() {



          }



          @Override

          public void sessionOpened(IoSession session) {

          }



          @Override

          public void messageReceived(IoSession session, Object message) {

          logger.info("Messagereceived in the client..");

          logger.info("Message is:" + message.toString());

          session.setAttribute("result", message.toString());

          session.close(true);

          }



          @Override

          public void exceptionCaught(IoSession session, Throwable cause) {

          session.close(true);

          }

          }

          5、總結(jié)

          通過(guò)本文中的例子,Apache Mina在服務(wù)端可實(shí)現(xiàn)TCP協(xié)議長(zhǎng)連接和短連接。在客戶端只實(shí)現(xiàn)了短連接模式,長(zhǎng)連接模式也是可以實(shí)現(xiàn)的(在本文中還是采用傳統(tǒng)的java Socket方式)。兩個(gè)服務(wù)端之間通過(guò)共享內(nèi)存的方式來(lái)傳遞連接對(duì)象也許有更好的實(shí)現(xiàn)方式。

          posted on 2013-05-11 21:56 paulwong 閱讀(560) 評(píng)論(0)  編輯  收藏 所屬分類: MINA

          主站蜘蛛池模板: 沁源县| 新宾| 昭平县| 亳州市| 红安县| 泾川县| 利川市| 莆田市| 出国| 虞城县| 陈巴尔虎旗| 华亭县| 潢川县| 礼泉县| 泰顺县| 罗田县| 本溪| 福海县| 铜山县| 厦门市| 海口市| 荥经县| 商河县| 南木林县| 莆田市| 乌兰浩特市| 浦县| 乐至县| 禄丰县| 紫金县| 无棣县| 醴陵市| 方山县| 西藏| 澄迈县| 辽宁省| 习水县| 玉环县| 图片| 衡阳市| 涞水县|