基于Apache Mina實(shí)現(xiàn)的TCP長連接和短連接實(shí)例
1、前言Apache MINA是Apache組織的一個優(yōu)秀的項(xiàng)目。MINA是Multipurpose Infrastructure for NetworkApplications的縮寫。它是一個網(wǎng)絡(luò)應(yīng)用程序框架,用來幫助用戶非常方便地開發(fā)高性能和高可靠性的網(wǎng)絡(luò)應(yīng)用程序。在本文中介紹了 如何通過Apache Mina2.0來實(shí)現(xiàn)TCP協(xié)議長連接和短連接應(yīng)用。
2、系統(tǒng)介紹
2.1系統(tǒng)框架
整個系統(tǒng)由兩個服務(wù)端程序和兩個客戶端程序組成。分別實(shí)現(xiàn)TCP長連接和短連接通信。
系統(tǒng)業(yè)務(wù)邏輯是一個客戶端與服務(wù)端建立長連接,一個客戶端與服務(wù)端建立短連接。數(shù)據(jù)從短連接客戶端經(jīng)過服務(wù)端發(fā)送到長連接客戶端,并從長連接客戶端接收響應(yīng)數(shù)據(jù)。當(dāng)收到響應(yīng)數(shù)據(jù)后斷開連接。
系統(tǒng)架構(gòu)圖如下:
2.2處理流程
系統(tǒng)處理流程如下:
1) 啟動服務(wù)端程序,監(jiān)聽8001和8002端口。
2) 長連接客戶端向服務(wù)端8002端口建立連接,服務(wù)端將連接對象保存到共享內(nèi)存中。由于采用長連接方式,連接對象是唯一的。
3) 短連接客戶端向服務(wù)端8001端口建立連接。建立連接后創(chuàng)建一個連接對象。
4) 短連接客戶端連接成功后發(fā)送數(shù)據(jù)。服務(wù)端接收到數(shù)據(jù)后從共享內(nèi)存中得到長連接方式的連接對象,使用此對象向長連接客戶端發(fā)送數(shù)據(jù)。發(fā)送前將短連接對象設(shè)為長連接對象的屬性值。
5) 長連接客戶端接收到數(shù)據(jù)后返回響應(yīng)數(shù)據(jù)。服務(wù)端從長連接對象的屬性中取得短連接對象,通過此對象將響應(yīng)數(shù)據(jù)發(fā)送給短連接客戶端。
6) 短連接客戶端收到響應(yīng)數(shù)據(jù)后,關(guān)閉連接。
3、服務(wù)端程序
3.1長連接服務(wù)端
服務(wù)啟動
public class MinaLongConnServer {
private static final int PORT = 8002;
public void start()throws IOException{
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
acceptor.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
acceptor.setHandler(new MinaLongConnServerHandler());
acceptor.getSessionConfig().setReadBufferSize(2048);
acceptor.bind(new InetSocketAddress(PORT));
System.out.println("Listeningon port " + PORT);
}
}
//消息處理
public class MinaLongConnServerHandler extends IoHandlerAdapter {
private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());
@Override
public void sessionOpened(IoSession session) {
InetSocketAddress remoteAddress = (InetSocketAddress)session.getRemoteAddress();
String clientIp = remoteAddress.getAddress().getHostAddress();
logger.info("LongConnect Server opened Session ID ="+String.valueOf(session.getId()));
logger.info("接收來自客戶端 :" + clientIp + "的連接.");
Initialization init = Initialization.getInstance();
HashMap<String, IoSession> clientMap =init.getClientMap();
clientMap.put(clientIp, session);
}
@Override
public void messageReceived(IoSession session, Object message) {
logger.info("Messagereceived in the long connect server..");
String expression = message.toString();
logger.info("Message is:" + expression);
IoSession shortConnSession =(IoSession) session.getAttribute("shortConnSession");
logger.info("ShortConnect Server Session ID ="+String.valueOf(shortConnSession.getId()));
shortConnSession.write(expression);
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) {
logger.info("Disconnectingthe idle.");
// disconnect an idle client
session.close(true);
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
// close the connection onexceptional situation
logger.warn(cause.getMessage(), cause);
session.close(true);
}
}
private static final int PORT = 8002;
public void start()throws IOException{
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
acceptor.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
acceptor.setHandler(new MinaLongConnServerHandler());
acceptor.getSessionConfig().setReadBufferSize(2048);
acceptor.bind(new InetSocketAddress(PORT));
System.out.println("Listeningon port " + PORT);
}
}
//消息處理
public class MinaLongConnServerHandler extends IoHandlerAdapter {
private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());
@Override
public void sessionOpened(IoSession session) {
InetSocketAddress remoteAddress = (InetSocketAddress)session.getRemoteAddress();
String clientIp = remoteAddress.getAddress().getHostAddress();
logger.info("LongConnect Server opened Session ID ="+String.valueOf(session.getId()));
logger.info("接收來自客戶端 :" + clientIp + "的連接.");
Initialization init = Initialization.getInstance();
HashMap<String, IoSession> clientMap =init.getClientMap();
clientMap.put(clientIp, session);
}
@Override
public void messageReceived(IoSession session, Object message) {
logger.info("Messagereceived in the long connect server..");
String expression = message.toString();
logger.info("Message is:" + expression);
IoSession shortConnSession =(IoSession) session.getAttribute("shortConnSession");
logger.info("ShortConnect Server Session ID ="+String.valueOf(shortConnSession.getId()));
shortConnSession.write(expression);
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) {
logger.info("Disconnectingthe idle.");
// disconnect an idle client
session.close(true);
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
// close the connection onexceptional situation
logger.warn(cause.getMessage(), cause);
session.close(true);
}
}
3.2短連接服務(wù)端
服務(wù)啟動
public class MinaShortConnServer {
private static final int PORT = 8001;
public void start()throws IOException{
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
acceptor.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
acceptor.setHandler(new MinaShortConnServerHandler());
acceptor.getSessionConfig().setReadBufferSize(2048);
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 3);
acceptor.bind(new InetSocketAddress(PORT));
System.out.println("Listeningon port " + PORT);
}
}
private static final int PORT = 8001;
public void start()throws IOException{
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
acceptor.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
acceptor.setHandler(new MinaShortConnServerHandler());
acceptor.getSessionConfig().setReadBufferSize(2048);
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 3);
acceptor.bind(new InetSocketAddress(PORT));
System.out.println("Listeningon port " + PORT);
}
}
消息處理
public class MinaShortConnServerHandler extends IoHandlerAdapter {
private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());
@Override
public void sessionOpened(IoSession session) {
InetSocketAddress remoteAddress = (InetSocketAddress)session.getRemoteAddress();
logger.info(remoteAddress.getAddress().getHostAddress());
logger.info(String.valueOf(session.getId()));
}
@Override
public void messageReceived(IoSession session, Object message) {
logger.info("Messagereceived in the short connect server
");
String expression = message.toString();
Initialization init = Initialization.getInstance();
HashMap<String, IoSession> clientMap =init.getClientMap();
if (clientMap == null || clientMap.size() == 0) {
session.write("error");
} else {
IoSession longConnSession = null;
Iterator<String> iterator =clientMap.keySet().iterator();
String key = "";
while (iterator.hasNext()) {
key = iterator.next();
longConnSession = clientMap.get(key);
}
logger.info("ShortConnect Server Session ID :"+String.valueOf(session.getId()));
logger.info("LongConnect Server Session ID :"+String.valueOf(longConnSession.getId()));
longConnSession.setAttribute("shortConnSession",session);
longConnSession.write(expression);
}
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) {
logger.info("Disconnectingthe idle.");
// disconnect an idle client
session.close(true);
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
// close the connection onexceptional situation
logger.warn(cause.getMessage(), cause);
session.close(true);
}
}
private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());
@Override
public void sessionOpened(IoSession session) {
InetSocketAddress remoteAddress = (InetSocketAddress)session.getRemoteAddress();
logger.info(remoteAddress.getAddress().getHostAddress());
logger.info(String.valueOf(session.getId()));
}
@Override
public void messageReceived(IoSession session, Object message) {
logger.info("Messagereceived in the short connect server

String expression = message.toString();
Initialization init = Initialization.getInstance();
HashMap<String, IoSession> clientMap =init.getClientMap();
if (clientMap == null || clientMap.size() == 0) {
session.write("error");
} else {
IoSession longConnSession = null;
Iterator<String> iterator =clientMap.keySet().iterator();
String key = "";
while (iterator.hasNext()) {
key = iterator.next();
longConnSession = clientMap.get(key);
}
logger.info("ShortConnect Server Session ID :"+String.valueOf(session.getId()));
logger.info("LongConnect Server Session ID :"+String.valueOf(longConnSession.getId()));
longConnSession.setAttribute("shortConnSession",session);
longConnSession.write(expression);
}
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) {
logger.info("Disconnectingthe idle.");
// disconnect an idle client
session.close(true);
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
// close the connection onexceptional situation
logger.warn(cause.getMessage(), cause);
session.close(true);
}
}
4、客戶端程序
4.1長連接客戶端
使用java.net.Socket來實(shí)現(xiàn)向服務(wù)端建立連接。Socket建立后一直保持連接,從服務(wù)端接收到數(shù)據(jù)包后直接將原文返回。
public class TcpKeepAliveClient {
private String ip;
private int port;
private static Socket socket = null;
private static int timeout = 50 * 1000;
public TcpKeepAliveClient(String ip, int port) {
this.ip = ip;
this.port = port;
}
public void receiveAndSend() throws IOException {
InputStream input = null;
OutputStream output = null;
try {
if (socket == null || socket.isClosed() || !socket.isConnected()) {
socket = new Socket();
InetSocketAddress addr = new InetSocketAddress(ip, port);
socket.connect(addr, timeout);
socket.setSoTimeout(timeout);
System.out.println("TcpKeepAliveClientnew ");
}
input = socket.getInputStream();
output = socket.getOutputStream();
// read body
byte[] receiveBytes = {};// 收到的包字節(jié)數(shù)組
while (true) {
if (input.available() > 0) {
receiveBytes = new byte[input.available()];
input.read(receiveBytes);
// send
System.out.println("TcpKeepAliveClientsend date :" + new String(receiveBytes));
output.write(receiveBytes, 0, receiveBytes.length);
output.flush();
}
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("TcpClientnew socket error");
}
}
public static void main(String[] args) throws Exception {
TcpKeepAliveClient client = new TcpKeepAliveClient("127.0.0.1", 8002);
client.receiveAndSend();
}
}
private String ip;
private int port;
private static Socket socket = null;
private static int timeout = 50 * 1000;
public TcpKeepAliveClient(String ip, int port) {
this.ip = ip;
this.port = port;
}
public void receiveAndSend() throws IOException {
InputStream input = null;
OutputStream output = null;
try {
if (socket == null || socket.isClosed() || !socket.isConnected()) {
socket = new Socket();
InetSocketAddress addr = new InetSocketAddress(ip, port);
socket.connect(addr, timeout);
socket.setSoTimeout(timeout);
System.out.println("TcpKeepAliveClientnew ");
}
input = socket.getInputStream();
output = socket.getOutputStream();
// read body
byte[] receiveBytes = {};// 收到的包字節(jié)數(shù)組
while (true) {
if (input.available() > 0) {
receiveBytes = new byte[input.available()];
input.read(receiveBytes);
// send
System.out.println("TcpKeepAliveClientsend date :" + new String(receiveBytes));
output.write(receiveBytes, 0, receiveBytes.length);
output.flush();
}
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("TcpClientnew socket error");
}
}
public static void main(String[] args) throws Exception {
TcpKeepAliveClient client = new TcpKeepAliveClient("127.0.0.1", 8002);
client.receiveAndSend();
}
}
4.2短連接客戶端
服務(wù)啟動
public class MinaShortClient {
private static final int PORT = 8001;
public static void main(String[] args) throws IOException,InterruptedException {
IoConnector connector = new NioSocketConnector();
connector.getSessionConfig().setReadBufferSize(2048);
connector.getFilterChain().addLast("logger", new LoggingFilter());
connector.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
connector.setHandler(new MinaShortClientHandler());
for (int i = 1; i <= 10; i++) {
ConnectFuture future = connector.connect(new InetSocketAddress("127.0.0.1", PORT));
future.awaitUninterruptibly();
IoSession session =future.getSession();
session.write(i);
session.getCloseFuture().awaitUninterruptibly();
System.out.println("result=" + session.getAttribute("result"));
}
connector.dispose();
}
}
private static final int PORT = 8001;
public static void main(String[] args) throws IOException,InterruptedException {
IoConnector connector = new NioSocketConnector();
connector.getSessionConfig().setReadBufferSize(2048);
connector.getFilterChain().addLast("logger", new LoggingFilter());
connector.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
connector.setHandler(new MinaShortClientHandler());
for (int i = 1; i <= 10; i++) {
ConnectFuture future = connector.connect(new InetSocketAddress("127.0.0.1", PORT));
future.awaitUninterruptibly();
IoSession session =future.getSession();
session.write(i);
session.getCloseFuture().awaitUninterruptibly();
System.out.println("result=" + session.getAttribute("result"));
}
connector.dispose();
}
}
消息處理
public class MinaShortClientHandler extends IoHandlerAdapter{
private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());
public MinaShortClientHandler() {
}
@Override
public void sessionOpened(IoSession session) {
}
@Override
public void messageReceived(IoSession session, Object message) {
logger.info("Messagereceived in the client..");
logger.info("Message is:" + message.toString());
session.setAttribute("result", message.toString());
session.close(true);
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
session.close(true);
}
}
private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());
public MinaShortClientHandler() {
}
@Override
public void sessionOpened(IoSession session) {
}
@Override
public void messageReceived(IoSession session, Object message) {
logger.info("Messagereceived in the client..");
logger.info("Message is:" + message.toString());
session.setAttribute("result", message.toString());
session.close(true);
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
session.close(true);
}
}
5、總結(jié)
通過本文中的例子,Apache Mina在服務(wù)端可實(shí)現(xiàn)TCP協(xié)議長連接和短連接。在客戶端只實(shí)現(xiàn)了短連接模式,長連接模式也是可以實(shí)現(xiàn)的(在本文中還是采用傳統(tǒng)的java Socket方式)。兩個服務(wù)端之間通過共享內(nèi)存的方式來傳遞連接對象也許有更好的實(shí)現(xiàn)方式。
posted on 2013-05-11 21:56 paulwong 閱讀(558) 評論(0) 編輯 收藏 所屬分類: MINA