http://www.advjava.com/java_io/001/006.html
最近學習了非阻塞IO(NIO),因為厭煩了在開發并行處理時候,阻塞IO所導致的肥服務端,因為對于每個客戶連接都要產生一個線程對此進行處理,當然你可以不這樣實現,但我的前提是開發并行處理,下面是我的源碼,因為是在dos命令行測試的,所以要是編寫為GUI的時候,還要很多要改的東西,這也是我下個征服的對象,當然我已經迫不及待了,下面是我花了三個晚上學習并編寫的非阻塞聊天室:(供交流學習用)
客戶端:
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.io.BufferedReader;
import java.io.*;
import java.lang.Thread;
import java.nio.charset.*;
import java.nio.charset.CharsetDecoder;
public class ChatClient{
?
?private InetSocketAddress address = new InetSocketAddress("localhost",13);
?private SocketChannel client = null;
?private String user = null;
?private String pass = null;
?private BufferedReader in = null;
?private Thread t = null;
?
?public ChatClient(){
? try{
?? client = SocketChannel.open();
?? System.out.println("connecting...");
??
?? client.connect(address);
?? System.out.println("connected with "+address.getHostName());
?? client.configureBlocking(false);
? }catch(IOException ex){
?? ex.printStackTrace();
?? System.exit(-1);
? }
? this.start();
?}
?
?public void start(){
? this.receiveMessage();
? this.sendMessage();?
?}
?
?public void sendMessage(){
? try{
?? in = new BufferedReader(new InputStreamReader(System.in));
?? System.out.println("Input the Info then check it out on the server");
?? System.out.print("Your Name:");
?? user = in.readLine();
?? System.out.println("Password:");
?? pass = in.readLine();
??
?? ByteBuffer buffer = ByteBuffer.allocate(50);
?? String message= new String("LOGIN:"+user+"&"+pass);
?? buffer = ByteBuffer.wrap(message.getBytes());
?? while(buffer.hasRemaining()&client.write(buffer)!=-1);
?? System.out.println(message+" has been send");
??
??
?? buffer.flip();
?? Charset charset = Charset.forName("gb2312");
?? CharsetDecoder decoder = charset.newDecoder();
?? CharBuffer charBuffer = decoder.decode(buffer);
?? //System.out.println("receive:"+charBuffer+" length:"+charBuffer.limit());
? }catch(IOException ex){
?? ex.printStackTrace();
? }
?
? this.waitFor(2000);
?
? System.out.println("WELCOME TO THE KING 'S CHAT ROOM!");
? System.out.println("Input the Info(exit is to leave out)");
? while(true){
?? System.out.print(">");
?
?? ByteBuffer buffer = ByteBuffer.allocate(100);
?? in = new BufferedReader(new InputStreamReader(System.in));
?? try{
??? String read=in.readLine();
??? if(read.equals("exit")){
???? break;
??? }??
??? String message1="SENTO:"+read;
??? buffer = ByteBuffer.wrap(message1.getBytes());
?? // buffer.flip();
??? System.out.println("before");
??? while(buffer.hasRemaining()&client.write(buffer)!=-1);
?? // buffer.flip();
??? System.out.println(message1+" has been send");
??? this.waitFor(500);
?? }catch(IOException ex){
??? ex.printStackTrace();
?? }
? }
?
? System.out.println("Welcome to use this soft!---King");
? System.exit(-1);
?
?}
?
?public void waitFor(long time){
? try{
?? Thread.sleep(time);????
? }catch(Exception ex){
?? ex.printStackTrace();
? }
?}
?
?public void receiveMessage(){
? t=new ReceiveThread(client);
? t.start();
?}
?
?public static void main(String[]args){
? ChatClient cc=new ChatClient();
?
?}
?
?class ReceiveThread extends Thread{
? SocketChannel client =null;
? ByteBuffer buffer=ByteBuffer.allocate(50);
? private boolean val=true;
?
? public ReceiveThread(SocketChannel client){
?? this.client = client;
? }
?
? public void run(){
?? while(val){
??? try{
???? while (client.read(buffer) > 0){
????? buffer.flip();
????? String result = decode(buffer);
????? System.out.println(">(back)"+result);
????? buffer.flip();?
???? }
??? }catch(IOException ex){
???? ex.printStackTrace();
???? return;
??? }
??
?? }
? }
?}
?
?public String decode(ByteBuffer buffer){
? Charset charset=null;
? CharsetDecoder decoder=null;
? CharBuffer charBuffer=null;
? try{
?? charset= Charset.forName("gb2312");
??? decoder= charset.newDecoder();
??? charBuffer= decoder.decode(buffer);
?? return charBuffer.toString();
? }catch(Exception ex){
?? ex.printStackTrace();
?? return "";
? }
?
?}
}注意:可以多個客戶進行交流,程序要求輸入驗證信息,但由于時間原因后臺我都以合法用戶給予回饋
服務端:import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.net.ServerSocket;
import java.net.InetSocketAddress;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import java.io.IOException;
import java.util.Iterator;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.nio.charset.*;
import java.nio.*;
public class ChatServer {
?
?private int port = 13;
?private Selector selector;
?private ServerSocketChannel ssc;
?private ServerSocket server;
?private InetSocketAddress address;
?private ArrayList connectKey=new ArrayList();
?
?public ChatServer(){
? //initServer?
? try{
?? ssc=ServerSocketChannel.open();
?? server=ssc.socket();
?? address = new InetSocketAddress(port);
?? server.bind(address);
?? selector=Selector.open();
?? ssc.configureBlocking(false);
?? ssc.register(selector,SelectionKey.OP_ACCEPT);
?? System.out.println("============================================================");
?? System.out.println("=??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? =");
?? System.out.println("=??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? =");
?? System.out.println("=???????????????????????????????????? 水底沙聊天室-version1.0??????????????????????????????????????????????????? =");
?? System.out.println("=??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? =");
?? System.out.println("=??????????????????????????????????????????????????????????????????????????????????????????????? QQ:247095340(交流) =");
?? System.out.println("============================================================");??
?? System.out.println("Listening the port 13...");?
? }catch(IOException ex){
?? ex.printStackTrace();
?? System.exit(-1);
? }
?}
?
?public void startServer() throws IOException{
? while(true){
?? int i=selector.select();
?? //System.out.print(i);
?? Iterator keys = selector.selectedKeys().iterator();
??
?? while(keys.hasNext()){
??? SelectionKey key = (SelectionKey)keys.next();
??? keys.remove();
??? try{???
???? if(key.isAcceptable()){
????? ServerSocketChannel ssc=(ServerSocketChannel)key.channel();????
????? SocketChannel channel = ssc.accept();//return null if there's no request
????? System.out.println(channel+" has accepted");
????? channel.configureBlocking(false);
????? SelectionKey clientKey=channel.register(selector,SelectionKey.OP_READ);????
???? }//else
???? if(key.isWritable()){
????? SocketChannel channel = (SocketChannel)key.channel();
????? ByteBuffer buffer = (ByteBuffer)key.attachment();
?????
????? if(buffer!=null){??
?????? key.attach(null);//avoid the return twice
??????
?????? //check info:the login or the message
?????? //buffer.flip();
?????? String checkBuffer = this.decode(buffer);
?????? System.out.println("write:"+checkBuffer);
??????
?????? if(checkBuffer.equals("LOGIN:OK")){
??????? //return LOGIN:OK then add into the connectKey array!
??????? System.out.println("ok"+buffer);
??????? buffer.flip();
??????? //while(buffer.hasRemaining()&channel.write(buffer)!=-1);
??????? channel.write(buffer);
??????? key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);?????
??????? connectKey.add(key);//add to the connectKey array
??????? System.out.println("here");
?????? }else if(checkBuffer.equals("LOGIN:ERROR")){
??????? //return LOGIN:ERROR the client should close the channel
??????? //warning:method:key.channel();
??????? //Returns the channel for which this key was created.
??????? // This method will continue to return the channel even after the key is cancelled.
??????? while(buffer.hasRemaining()&channel.write(buffer)!=-1);
??????? key.cancel();
?????? }else //if(checkBuffer.indexOf("SENTO:")!=-1){
?????? {
???????
??????? //return the message to everyone
?????? // while(buffer.hasRemaining()&channel.write(buffer)!=-1);
???????
??????? System.out.println("sento"+buffer);
??????? buffer.flip();
??????? channel.write(buffer);
??????? System.out.println("send over");
?????? }
????? }
???? }//else
???? if(key.isReadable()){
????? SocketChannel channel = (SocketChannel)key.channel();
????? ByteBuffer buffer=ByteBuffer.allocate(50);
????? System.out.println("read...");
?????
????? channel.read(buffer);
?
????? buffer.flip();
????? String checkBuffer = this.decode(buffer);
????? System.out.println("read:"+checkBuffer);
????
????? //while(buffer.hasRemaining()&&channel.read(buffer)!=-1);
?????
????? //check the buffer
????? //buffer.flip();
?????
????? //String checkBuffer = this.decode(buffer);
???? // System.out.println("read:"+checkBuffer);
?????
????
????? if(checkBuffer.startsWith("LOGIN:")){
?????? //get info of the user & pass then check for it,return feedback!
?????? //the format is LOGIN:user&pass
?????? int p1=checkBuffer.length();
?????? int p2=checkBuffer.indexOf("&");
??????
?????? String user=checkBuffer.substring(6,p2);
?????? String pass=checkBuffer.substring(p2+1,p1);
?????? System.out.println(user+pass);
??????
?????? //todo check from the database!!!
?????? //assume the user is legal
?????? ByteBuffer feedback = ByteBuffer.allocate(20);
?????? feedback=ByteBuffer.wrap("LOGIN:OK".getBytes());
?????? key.interestOps(SelectionKey.OP_WRITE);
?????? key.attach(feedback);?????
????? }else if(checkBuffer.startsWith("SENTO:")){
?????? String message = checkBuffer.substring(6);
?????? System.out.println("sentto:"+message);
?????? ByteBuffer buffer1 = ByteBuffer.allocate(50);
?????? buffer1=ByteBuffer.wrap(message.getBytes());
?????? Iterator it = connectKey.iterator();
?????? //key.interestOps(SelectionKey.OP_WRITE);
?????? while(it.hasNext()){
??????? ((SelectionKey)it.next()).attach(buffer1.duplicate());
?????? }
?????? System.out.println("here1");
?????? //for(int i=0;i<connectKey.add.;i++){
??????? //connectKey[i].attach(buffer.duplicate());
?????? //}
????? }
???? }
??? }catch(IOException ex){
???? key.cancel();
???? //System.exit(-1);
???? try{
????? key.channel().close();
???? }catch(IOException cex){
???? }
??? }
?? }
? }
?}
?
?public String decode(ByteBuffer buffer){
? Charset charset=null;
? CharsetDecoder decoder=null;
? CharBuffer charBuffer=null;
? try{
?? charset= Charset.forName("gb2312");
??? decoder= charset.newDecoder();
??? charBuffer= decoder.decode(buffer);
?? return charBuffer.toString();
? }catch(Exception ex){
?? ex.printStackTrace();
?? return "";
? }
?
?}
?
?public static void main(String []args){
? ChatServer cs = new ChatServer();
? try{
?? cs.startServer();
? }catch(IOException ex){
?? ex.printStackTrace();
?? System.exit(-1);
? }
?}
}注意:假如客戶強制登出服務端時候,服務器里面的登錄用戶列表還是保存他注冊的SelectionKey地址,這是存在問題,其實解決很簡單,在對通道進行寫入時候,如果通道已經被關閉的話,可以用try/catch語句進行處理
上面就是總的程序,其實之前我都是用阻塞socket去完成這類工作的,由于在用swt基于實現時候遇到很多swt線程問題,后期我會以GUI界面共享給大家,當然自己也在不斷學習中!King
只有注冊用戶登錄后才能發表評論。 | ||
![]() |
||
網站導航:
博客園
IT新聞
Chat2DB
C++博客
博問
管理
|
||
相關文章:
|
||