轉(zhuǎn):用Java Socket開(kāi)發(fā)小型服務(wù)器,支持上千個(gè)并發(fā)(下)
6 創(chuàng)建多線程Socket服務(wù)器
前面的示例教給您基礎(chǔ)知識(shí),但并不能令您更深入。如果您到此就停止了,那么您一次只能處理一臺(tái)客戶機(jī)。原因是handleConnection()是一個(gè)阻塞方法。只有當(dāng)它完成了對(duì)當(dāng)前連接的處理時(shí),服務(wù)器才能接受另一個(gè)客戶機(jī)。在多數(shù)時(shí)候,您將需要(也有必要)一個(gè)多線程服務(wù)器。 創(chuàng)建 MultithreadedRemoteFileServer 類 import java.io.*; import java.net.*; public class MultithreadedRemoteFileServer { int listenPort; public MultithreadedRemoteFileServer(int listenPort) { this.listenPort=listenPort; } //允許客戶機(jī)連接到服務(wù)器,等待客戶機(jī)請(qǐng)求 public void acceptConnections() { try { ServerSocket server = new ServerSocket(listenPort, 5); Socket incomingConnection = null; while(true) { incomingConnection = server.accept(); handleConnection(incomingConnection); } } catch(BindException e) { System.out.println("Unable to bind to port "+listenPort); } catch(IOException e) { System.out.println("Unable to instantiate a ServerSocket on port: "+listenPort); } } //與客戶機(jī)Socket交互以將客戶機(jī)所請(qǐng)求的文件的內(nèi)容發(fā)送到客戶機(jī) public void handleConnection(Socket connectionToHandle) { new Thread(new ConnectionHandler(connectionToHandle)).start(); } public static void main(String args[]) { MultithreadedRemoteFileServer server = new MultithreadedRemoteFileServer(1001); server.acceptConnections(); } } 這里我們實(shí)現(xiàn)改動(dòng)過(guò)acceptConnections()方法,它將創(chuàng)建一個(gè)能夠處理待發(fā)請(qǐng)求的ServerSocket,并告訴ServerSocket接受連接。 新的 server 仍然需要acceptConnections(),所以這些代碼實(shí)際上是一樣的。突出顯示的行表示一個(gè)重大的不同。對(duì)這個(gè)多線程版,我們現(xiàn)在可以指定客戶機(jī)請(qǐng)求的最大數(shù)目,這些請(qǐng)求都能在實(shí)例化ServerSocket期間處于待發(fā)狀態(tài)。如果我們沒(méi)有指定客戶機(jī)請(qǐng)求的最大數(shù)目,則我們假設(shè)使用缺省值50。 這里是它的工作機(jī)制。假設(shè)我們指定待發(fā)數(shù)(backlog 值)是5并且有五臺(tái)客戶機(jī)請(qǐng)求連接到我們的服務(wù)器。我們的服務(wù)器將著手處理第一個(gè)連接,但處理該連接需要很長(zhǎng)時(shí)間。由于我們的待發(fā)值是5,所以我們一次可以放五個(gè)請(qǐng)求到隊(duì)列中。我們正在處理一個(gè),所以這意味著還有其它五個(gè)正在等待。等待的和正在處理的一共有六個(gè)。當(dāng)我們的服務(wù)器仍忙于接受一號(hào)連接(記住隊(duì)列中還有 2?6 號(hào))時(shí),如果有第七個(gè)客戶機(jī)提出連接申請(qǐng),那么,該第七個(gè)客戶機(jī)將遭到拒絕。我們將在帶有連接池服務(wù)器示例中說(shuō)明如何限定能同時(shí)連接的客戶機(jī)數(shù)目。 處理連接: public void handleConnection(Socket connectionToHandle) { new Thread(new ConnectionHandler(connectionToHandle)).start(); } 我們對(duì)RemoteFileServer所做的大改動(dòng)就體現(xiàn)在這個(gè)方法上。我們?nèi)匀辉诜?wù)器接受一個(gè)連接之后調(diào)用handleConnection(),但現(xiàn)在我們把該Socket傳遞給ConnectionHandler的一個(gè)實(shí)例,它是 Runnable的。我們用ConnectionHandler創(chuàng)建一個(gè)新 Thread 并啟動(dòng)它。ConnectionHandler的run()方法包Socket讀/寫(xiě)和讀File的代碼,這些代碼原來(lái)在RemoteFileServer的handleConnection()中。 創(chuàng)建 ConnectionHandler 類 import java.io.*; import java.net.*; public class ConnectionHandler implements Runnable { protected Socket socketToHandle; public ConnectionHandler(Socket socketToHandle) { this.socketToHandle=socketToHandle; } public void run() { try { PrintWriter streamWriter = new PrintWriter(socketToHandle.getOutputStream()); BufferedReader streamReader = new BufferedReader(new InputStreamReader(socketToHandle.getInputStream())); String fileToRead = streamReader.readLine(); BufferedReader fileReader = new BufferedReader(new FileReader(fileToRead)); String line =null; while((line=fileReader.readLine())!=null) { streamWriter.println(line); } fileReader.close(); streamWriter.close(); streamReader.close(); } catch(Exception e) { System.out.println("Error handling a client: "+e); e.printStackTrace(); } } } 這個(gè)助手類相當(dāng)簡(jiǎn)單。跟我們到目前為止的其它類一樣,我們導(dǎo)入java.net和java.io。該類只有一個(gè)實(shí)例變量socketToHandle,它保存由該實(shí)例處理的Socket。 類的構(gòu)造器用一個(gè)Socket實(shí)例作參數(shù)并將它賦給socketToHandle。 請(qǐng)注意該類實(shí)現(xiàn)了Runnable接口。實(shí)現(xiàn)這個(gè)接口的類都必須實(shí)現(xiàn)run()方法。這里我們實(shí)現(xiàn)run()方法,它將攫取我們的連接的流,用它來(lái)讀寫(xiě)該連接,并在任務(wù)完成之后關(guān)閉它。ConnectionHandler的run()方法所做的事情就是RemoteFileServer上的handleConnection()所做的事情。首先,我們把InputStream和OutputStream分別包裝(用Socket的getOutputStream()和 getInputStream())進(jìn)BufferedReader和PrintWriter。然后我們用這些代碼逐行地讀目標(biāo)文件: PrintWriter streamWriter = new PrintWriter(socketToHandle.getOutputStream()); BufferedReader streamReader = new BufferedReader(new InputStreamReader(socketToHandle.getInputStream())); String fileToRead = streamReader.readLine(); BufferedReader fileReader = new BufferedReader(new FileReader(fileToRead)); String line =null; while((line=fileReader.readLine())!=null) { streamWriter.println(line); } 請(qǐng)記住我們應(yīng)該從客戶機(jī)獲取一條有效的文件路徑,這樣用該路徑名構(gòu)造一個(gè)新File,把它包裝進(jìn)FileReader以處理讀文件的操作,然后把它包裝進(jìn)BufferedReader以讓我們逐行地讀該文件。我們while循環(huán)中調(diào)用BufferedReader上的readLine()直到不再有要讀的行。請(qǐng)記注,對(duì)readLine()的調(diào)用將造成阻塞,直到有字節(jié)來(lái)到為止。我們獲取一些字節(jié)之后就把它們放到本地的line變量中,然后寫(xiě)出到客戶機(jī)上。完成讀寫(xiě)操作之后,我們關(guān)閉打開(kāi)的流。 總結(jié)一下多線程服務(wù)器 讓我們回顧一下創(chuàng)建和使用“多線程版”的服務(wù)器的步驟: 1. 修改 acceptConnections() 以用缺省為 50(或任何您想要的大于 1 的指定數(shù)字)實(shí)例化 ServerSocket。 2. 修改 ServerSocket 的 handleConnection() 以用 ConnectionHandler 的一個(gè)實(shí)例生成一個(gè)新的 Thread。 3. 借用 RemoteFileServer 的 handleConnection() 方法的代碼實(shí)現(xiàn) ConnectionHandler 類。 7 創(chuàng)建帶有連接池的Socket服務(wù)器 我們現(xiàn)在已經(jīng)擁有的 MultithreadedServer 每當(dāng)有客戶機(jī)申請(qǐng)一個(gè)連接時(shí)都在一個(gè)新Thread中創(chuàng)建一個(gè)新ConnectionHandler。這意味著可能有一捆Thread“躺”在我們周圍。而且創(chuàng)建Thread的系統(tǒng)開(kāi)銷并不是微不足道的。如果性能成為了問(wèn)題(也請(qǐng)不要事到臨頭才意識(shí)到它),更高效地處理我們的服務(wù)器是件好事。那么,我們?nèi)绾胃咝У毓芾矸?wù)器端呢?我們可以維護(hù)一個(gè)進(jìn)入的連接池,一定數(shù)量的ConnectionHandler將為它提供服務(wù)。這種設(shè)計(jì)能帶來(lái)以下好處: • 它限定了允許同時(shí)連接的數(shù)目。 • 我們只需啟動(dòng)ConnectionHandler Thread一次。 幸運(yùn)的是,跟在我們的多線程示例中一樣,往代碼中添加“池”不需要來(lái)一個(gè)大改動(dòng)。事實(shí)上,應(yīng)用程序的客戶機(jī)端根本就不受影響。在服務(wù)器端,我們?cè)诜?wù)器啟動(dòng)時(shí)創(chuàng)建一定數(shù)量的 ConnectionHandler,我們把進(jìn)入的連接放入“池”中并讓ConnectionHandler打理剩下的事情。這種設(shè)計(jì)中有很多我們不打算討論的可能存在的技巧。例如,我們可以通過(guò)限定允許在“池”中建立的連接的數(shù)目來(lái)拒絕客戶機(jī)。 請(qǐng)注意:我們將不會(huì)再次討論acceptConnections()。這個(gè)方法跟前面示例中的完全一樣。它無(wú)限循環(huán)地調(diào)用ServerSocket上的 accept() 并把連接傳遞到handleConnection()。 創(chuàng)建 PooledRemoteFileServer 類 import java.io.*; import java.net.*; import java.util.*; public class PooledRemoteFileServer { protected int maxConnections; protected int listenPort; protected ServerSocket serverSocket; public PooledRemoteFileServer(int aListenPort, int maxConnections) { listenPort= aListenPort; this.maxConnections = maxConnections; } public void acceptConnections() { try { ServerSocket server = new ServerSocket(listenPort, 5); Socket incomingConnection = null; while(true) { incomingConnection = server.accept(); handleConnection(incomingConnection); } } catch(BindException e) { System.out.println(""); } catch(IOException e) { System.out.println(""+listenPort); } } protected void handleConnection(Socket connectionToHandle) { PooledConnectionHandler.processRequest(connectionToHandle); } public void setUpHandlers() { for(int i=0; i<maxConnections; i++) { PooledConnectionHandler currentHandler = new PooledConnectionHandler(); new Thread(currentHandler, "Handler " + i).start(); } } public static void main(String args[]) { PooledRemoteFileServer server = new PooledRemoteFileServer(1001, 3); server.setUpHandlers(); server.acceptConnections(); } } 請(qǐng)注意一下您現(xiàn)在應(yīng)該熟悉了的 import 語(yǔ)句。我們給類以下實(shí)例變量以保存: • 我們的服務(wù)器能同時(shí)處理的活動(dòng)客戶機(jī)連接的最大數(shù)目 • 進(jìn)入的連接的偵聽(tīng)端口(我們沒(méi)有指定缺省值,但如果您想這樣做,并不會(huì)受到限制) • 將接受客戶機(jī)連接請(qǐng)求的 ServerSocket 類的構(gòu)造器用的參數(shù)是偵聽(tīng)端口和連接的最大數(shù)目 我們的類有一個(gè) main() 方法和三個(gè)其它方法。稍后我們將探究這些方法的細(xì)節(jié)。現(xiàn)在只須知道setUpHandlers()創(chuàng)建數(shù)目為maxConnections的大量PooledConnectionHandler,而其它兩個(gè)方法則與我們前面已經(jīng)看到的相似:acceptConnections()在ServerSocket上偵聽(tīng)傳入的客戶機(jī)連接,而handleConnection則在客戶機(jī)連接一旦被建立后就實(shí)際處理它。 實(shí)現(xiàn) main() 這里我們實(shí)現(xiàn)需作改動(dòng)的main()方法,該方法將創(chuàng)建能夠處理給定數(shù)目的客戶機(jī)連接的PooledRemoteFileServer,并告訴它接受連接: public static void main(String args[]) { PooledRemoteFileServer server = new PooledRemoteFileServer(1001, 3); server.setUpHandlers(); server.acceptConnections(); } 我們的main()方法很簡(jiǎn)單。我們實(shí)例化一個(gè)新的PooledRemoteFileServer,它將通過(guò)調(diào)用setUpHandlers()來(lái)建立三個(gè)PooledConnectionHandler。一旦服務(wù)器就緒,我們就告訴它acceptConnections()。 建立連接處理程序 public void setUpHandlers() { for(int i=0; i<maxConnections; i++) { PooledConnectionHandler currentHandler = new PooledConnectionHandler(); new Thread(currentHandler, "Handler " + i).start(); } } setUpHandlers()方法創(chuàng)建maxConnections(例如 3)個(gè)PooledConnectionHandler并在新Thread中激活它們。用實(shí)現(xiàn)了Runnable的對(duì)象來(lái)創(chuàng)建Thread使我們可以在Thread調(diào)用start()并且可以期望在Runnable上調(diào)用了run()。換句話說(shuō),我們的PooledConnectionHandler將等著處理進(jìn)入的連接,每個(gè)都在它自己的Thread中進(jìn)行。我們?cè)谑纠兄粍?chuàng)建三個(gè)Thread,而且一旦服務(wù)器運(yùn)行,這就不能被改變。 處理連接 這里我們實(shí)現(xiàn)需作改動(dòng)的handleConnections()方法,它將委派PooledConnectionHandler處理連接: protected void handleConnection(Socket connectionToHandle) { PooledConnectionHandler.processRequest(connectionToHandle); } 我們現(xiàn)在叫 PooledConnectionHandler 處理所有進(jìn)入的連接(processRequest() 是一個(gè)靜態(tài)方法)。 創(chuàng)建 PooledRemoteFileServer 類 import java.io.*; import java.net.*; import java.util.*; public class PooledConnectionHandler implements Runnable { protected Socket connection; protected static List pool = new LinkedList(); public PooledConnectionHandler() {} public void handleConnection() { try { PrintWriter streamWriter = new PrintWriter(connection.getOutputStream()); BufferedReader streamReader = new BufferedReader(new InputStreamReader(connection.getInputStream())); String fileToRead = streamReader.readLine(); BufferedReader fileReader = new BufferedReader(new FileReader(fileToRead)); String line = null; while((line=fileReader.readLine())!=null) streamWriter.println(line); fileReader.close(); streamWriter.close(); streamReader.close(); } catch(FileNotFoundException e) { System.out.println(""); } catch(IOException e) { System.out.println(""+e); } } public static void processRequest(Socket requestToHandle) { synchronized(pool) { pool.add(pool.size(), requestToHandle); pool.notifyAll(); } } public void run() { while(true) { synchronized(pool) { while(pool.isEmpty()) { try { pool.wait(); } catch(InterruptedException e) { e.printStackTrace(); } } connection= (Socket)pool.remove(0); } handleConnection(); } } } 這個(gè)助手類與 ConnectionHandler 非常相似,但它帶有處理連接池的手段。該類有兩個(gè)實(shí)例變量: • connection 是當(dāng)前正在處理的 Socket • 名為 pool 的靜態(tài) LinkedList 保存需被處理的連接 填充連接池 這里我們實(shí)現(xiàn)PooledConnectionHandler上的processRequest()方法,它將把傳入請(qǐng)求添加到池中,并告訴其它正在等待的對(duì)象該池已經(jīng)有一些內(nèi)容: public static void processRequest(Socket requestToHandle) { synchronized(pool) { pool.add(pool.size(), requestToHandle); pool.notifyAll(); } } synchronized 塊是個(gè)稍微有些不同的東西。您可以同步任何對(duì)象上的一個(gè)塊,而不只是在本身的某個(gè)方法中含有該塊的對(duì)象。在我們的示例中,processRequest() 方法包含有一個(gè) pool(請(qǐng)記住它是一個(gè) LinkedList,保存等待處理的連接池)的 synchronized塊。我們這樣做的原因是確保沒(méi)有別人能跟我們同時(shí)修改連接池。 既然我們已經(jīng)保證了我們是唯一“涉水”池中的人,我們就可以把傳入的Socket添加到LinkedList的尾端。一旦我們添加了新的連接,我們就用以下代碼通知其它正在等待該池的Thread,池現(xiàn)在已經(jīng)可用: pool.notifyAll(); Object的所有子類都繼承這個(gè)notifyAll()方法。這個(gè)方法,連同我們下一屏將要討論的wait()方法一起,就使一個(gè)Thread能夠讓另一個(gè)Thread知道一些條件已經(jīng)具備。這意味著該第二個(gè)Thread一定正在等待那些條件的滿足。 從池中獲取連接 這里我們實(shí)現(xiàn)PooledConnectionHandler上需作改動(dòng)的run()方法,它將在連接池上等待,并且池中一有連接就處理它: public void run() { while(true) { synchronized(pool) { while(pool.isEmpty()) { try { pool.wait(); } catch(InterruptedException e) { e.printStackTrace(); } } connection= (Socket)pool.remove(0); } handleConnection(); } } 回想一下在前面講過(guò)的:一個(gè)Thread正在等待有人通知它連接池方面的條件已經(jīng)滿足了。在我們的示例中,請(qǐng)記住我們有三個(gè)PooledConnectionHandler在等待使用池中的連接。每個(gè)PooledConnectionHandler都在它自已的Thread中運(yùn)行,并通過(guò)調(diào)用pool.wait()產(chǎn)生阻塞。當(dāng)我們的processRequest()在連接池上調(diào)用notifyAll()時(shí),所有正在等待的PooledConnectionHandler都將得到“池已經(jīng)可用”的通知。然后各自繼續(xù)前行調(diào)用pool.wait(),并重新檢查while(pool.isEmpty())循環(huán)條件。除了一個(gè)處理程序,其它池對(duì)所有處理程序都將是空的,因此,在調(diào)用pool.wait()時(shí),除了一個(gè)處理程序,其它所有處理程序都將再次產(chǎn)生阻塞。恰巧碰上非空池的處理程序?qū)⑻鰓hile(pool.isEmpty())循環(huán)并攫取池中的第一個(gè)連接: connection= (Socket)pool.remove(0); 處理程序一旦有一個(gè)連接可以使用,就調(diào)用 handleConnection() 處理它。 在我們的示例中,池中可能永遠(yuǎn)不會(huì)有多個(gè)連接,只是因?yàn)槭虑楹芸炀捅惶幚淼袅恕H绻刂杏幸粋€(gè)以上連接,那么其它處理程序?qū)⒉槐氐却碌倪B接被添加到池。當(dāng)它們檢查pool.isEmpty()條件時(shí),將發(fā)現(xiàn)其值為假,然后就從池中攫取一個(gè)連接并處理它。 還有另一件事需注意。當(dāng)run()擁有池的互斥鎖時(shí),processRequest()如何能夠把連接放到池中呢?答案是對(duì)池上的wait()的調(diào)用釋放鎖,而wait()接著就在自己返回之前再次攫取該鎖。這就使得池對(duì)象的其它同步代碼可以獲取該鎖。 處理連接:再一次 這里我們實(shí)現(xiàn)需做改動(dòng)的handleConnection()方法,該方法將攫取連接的流,使用它們,并在任務(wù)完成之后清除它們: public void handleConnection() { try { PrintWriter streamWriter = new PrintWriter(connection.getOutputStream()); BufferedReader streamReader = new BufferedReader(new InputStreamReader(connection.getInputStream())); String fileToRead = streamReader.readLine(); BufferedReader fileReader = new BufferedReader(new FileReader(fileToRead)); String line = null; while((line=fileReader.readLine())!=null) streamWriter.println(line); fileReader.close(); streamWriter.close(); streamReader.close(); } catch(FileNotFoundException e) { System.out.println(""); } catch(IOException e) { System.out.println(""+e); } } 跟在多線程服務(wù)器中不同,我們的PooledConnectionHandler有一個(gè)handleConnection()方法。這個(gè)方法的代碼跟非池式的ConnectionHandler上的run()方法的代碼完全一樣。首先,我們把OutputStream和InputStream分別包裝進(jìn)(用Socket上的getOutputStream()和getInputStream())BufferedReader和PrintWriter。然后我們逐行讀目標(biāo)文件,就象我們?cè)诙嗑€程示例中做的那樣。再一次,我們獲取一些字節(jié)之后就把它們放到本地的line變量中,然后寫(xiě)出到客戶機(jī)。完成讀寫(xiě)操作之后,我們關(guān)閉FileReader和打開(kāi)的流。 總結(jié)一下帶有連接池的服務(wù)器 讓我們回顧一下創(chuàng)建和使用“池版”服務(wù)器的步驟: 1. 創(chuàng)建一個(gè)新種類的連接處理程序(我們稱之為 PooledConnectionHandler)來(lái)處理池中的連接。 2. 修改服務(wù)器以創(chuàng)建和使用一組 PooledConnectionHandler。 Java 語(yǔ)言簡(jiǎn)化了套接字在應(yīng)用程序中的使用。它的基礎(chǔ)實(shí)際上是 java.net 包中的 Socket 和 ServerSocket 類。一旦您理解了表象背后發(fā)生的情況,就能容易地使用這些類。在現(xiàn)實(shí)生活中使用套接字只是這樣一件事,即通過(guò)貫徹優(yōu)秀的 OO 設(shè)計(jì)原則來(lái)保護(hù)應(yīng)用程序中各層間的封裝。我們?yōu)槟故玖艘恍┯袔椭念悺_@些類的結(jié)構(gòu)對(duì)我們的應(yīng)用程序隱藏了 Socket 交互作用的低級(jí)細(xì)節(jié) ? 使應(yīng)用程序能只使用可插入的 ClientSocketFacade 和 ServerSocketFacade。在有些地方(在 Facade 內(nèi)),您仍然必須管理稍顯雜亂的字節(jié)細(xì)節(jié),但您只須做一次就可以了。更好的是,您可以在將來(lái)的項(xiàng)目中重用這些低級(jí)別的助手類。 |
posted on 2009-01-04 12:56 liujg 閱讀(258) 評(píng)論(0) 編輯 收藏