轉:用Java Socket開發小型服務器,支持上千個并發(下)
6 創建多線程Socket服務器
前面的示例教給您基礎知識,但并不能令您更深入。如果您到此就停止了,那么您一次只能處理一臺客戶機。原因是handleConnection()是一個阻塞方法。只有當它完成了對當前連接的處理時,服務器才能接受另一個客戶機。在多數時候,您將需要(也有必要)一個多線程服務器。 創建 MultithreadedRemoteFileServer 類 import java.io.*; import java.net.*; public class MultithreadedRemoteFileServer { int listenPort; public MultithreadedRemoteFileServer(int listenPort) { this.listenPort=listenPort; } //允許客戶機連接到服務器,等待客戶機請求 public void acceptConnections() { try { ServerSocket server = new ServerSocket(listenPort, 5); Socket incomingConnection = null; while(true) { incomingConnection = server.accept(); handleConnection(incomingConnection); } } catch(BindException e) { System.out.println("Unable to bind to port "+listenPort); } catch(IOException e) { System.out.println("Unable to instantiate a ServerSocket on port: "+listenPort); } } //與客戶機Socket交互以將客戶機所請求的文件的內容發送到客戶機 public void handleConnection(Socket connectionToHandle) { new Thread(new ConnectionHandler(connectionToHandle)).start(); } public static void main(String args[]) { MultithreadedRemoteFileServer server = new MultithreadedRemoteFileServer(1001); server.acceptConnections(); } } 這里我們實現改動過acceptConnections()方法,它將創建一個能夠處理待發請求的ServerSocket,并告訴ServerSocket接受連接。 新的 server 仍然需要acceptConnections(),所以這些代碼實際上是一樣的。突出顯示的行表示一個重大的不同。對這個多線程版,我們現在可以指定客戶機請求的最大數目,這些請求都能在實例化ServerSocket期間處于待發狀態。如果我們沒有指定客戶機請求的最大數目,則我們假設使用缺省值50。 這里是它的工作機制。假設我們指定待發數(backlog 值)是5并且有五臺客戶機請求連接到我們的服務器。我們的服務器將著手處理第一個連接,但處理該連接需要很長時間。由于我們的待發值是5,所以我們一次可以放五個請求到隊列中。我們正在處理一個,所以這意味著還有其它五個正在等待。等待的和正在處理的一共有六個。當我們的服務器仍忙于接受一號連接(記住隊列中還有 2?6 號)時,如果有第七個客戶機提出連接申請,那么,該第七個客戶機將遭到拒絕。我們將在帶有連接池服務器示例中說明如何限定能同時連接的客戶機數目。 處理連接: public void handleConnection(Socket connectionToHandle) { new Thread(new ConnectionHandler(connectionToHandle)).start(); } 我們對RemoteFileServer所做的大改動就體現在這個方法上。我們仍然在服務器接受一個連接之后調用handleConnection(),但現在我們把該Socket傳遞給ConnectionHandler的一個實例,它是 Runnable的。我們用ConnectionHandler創建一個新 Thread 并啟動它。ConnectionHandler的run()方法包Socket讀/寫和讀File的代碼,這些代碼原來在RemoteFileServer的handleConnection()中。 創建 ConnectionHandler 類 import java.io.*; import java.net.*; public class ConnectionHandler implements Runnable { protected Socket socketToHandle; public ConnectionHandler(Socket socketToHandle) { this.socketToHandle=socketToHandle; } public void run() { try { PrintWriter streamWriter = new PrintWriter(socketToHandle.getOutputStream()); BufferedReader streamReader = new BufferedReader(new InputStreamReader(socketToHandle.getInputStream())); String fileToRead = streamReader.readLine(); BufferedReader fileReader = new BufferedReader(new FileReader(fileToRead)); String line =null; while((line=fileReader.readLine())!=null) { streamWriter.println(line); } fileReader.close(); streamWriter.close(); streamReader.close(); } catch(Exception e) { System.out.println("Error handling a client: "+e); e.printStackTrace(); } } } 這個助手類相當簡單。跟我們到目前為止的其它類一樣,我們導入java.net和java.io。該類只有一個實例變量socketToHandle,它保存由該實例處理的Socket。 類的構造器用一個Socket實例作參數并將它賦給socketToHandle。 請注意該類實現了Runnable接口。實現這個接口的類都必須實現run()方法。這里我們實現run()方法,它將攫取我們的連接的流,用它來讀寫該連接,并在任務完成之后關閉它。ConnectionHandler的run()方法所做的事情就是RemoteFileServer上的handleConnection()所做的事情。首先,我們把InputStream和OutputStream分別包裝(用Socket的getOutputStream()和 getInputStream())進BufferedReader和PrintWriter。然后我們用這些代碼逐行地讀目標文件: PrintWriter streamWriter = new PrintWriter(socketToHandle.getOutputStream()); BufferedReader streamReader = new BufferedReader(new InputStreamReader(socketToHandle.getInputStream())); String fileToRead = streamReader.readLine(); BufferedReader fileReader = new BufferedReader(new FileReader(fileToRead)); String line =null; while((line=fileReader.readLine())!=null) { streamWriter.println(line); } 請記住我們應該從客戶機獲取一條有效的文件路徑,這樣用該路徑名構造一個新File,把它包裝進FileReader以處理讀文件的操作,然后把它包裝進BufferedReader以讓我們逐行地讀該文件。我們while循環中調用BufferedReader上的readLine()直到不再有要讀的行。請記注,對readLine()的調用將造成阻塞,直到有字節來到為止。我們獲取一些字節之后就把它們放到本地的line變量中,然后寫出到客戶機上。完成讀寫操作之后,我們關閉打開的流。 總結一下多線程服務器 讓我們回顧一下創建和使用“多線程版”的服務器的步驟: 1. 修改 acceptConnections() 以用缺省為 50(或任何您想要的大于 1 的指定數字)實例化 ServerSocket。 2. 修改 ServerSocket 的 handleConnection() 以用 ConnectionHandler 的一個實例生成一個新的 Thread。 3. 借用 RemoteFileServer 的 handleConnection() 方法的代碼實現 ConnectionHandler 類。 7 創建帶有連接池的Socket服務器 我們現在已經擁有的 MultithreadedServer 每當有客戶機申請一個連接時都在一個新Thread中創建一個新ConnectionHandler。這意味著可能有一捆Thread“躺”在我們周圍。而且創建Thread的系統開銷并不是微不足道的。如果性能成為了問題(也請不要事到臨頭才意識到它),更高效地處理我們的服務器是件好事。那么,我們如何更高效地管理服務器端呢?我們可以維護一個進入的連接池,一定數量的ConnectionHandler將為它提供服務。這種設計能帶來以下好處: • 它限定了允許同時連接的數目。 • 我們只需啟動ConnectionHandler Thread一次。 幸運的是,跟在我們的多線程示例中一樣,往代碼中添加“池”不需要來一個大改動。事實上,應用程序的客戶機端根本就不受影響。在服務器端,我們在服務器啟動時創建一定數量的 ConnectionHandler,我們把進入的連接放入“池”中并讓ConnectionHandler打理剩下的事情。這種設計中有很多我們不打算討論的可能存在的技巧。例如,我們可以通過限定允許在“池”中建立的連接的數目來拒絕客戶機。 請注意:我們將不會再次討論acceptConnections()。這個方法跟前面示例中的完全一樣。它無限循環地調用ServerSocket上的 accept() 并把連接傳遞到handleConnection()。 創建 PooledRemoteFileServer 類 import java.io.*; import java.net.*; import java.util.*; public class PooledRemoteFileServer { protected int maxConnections; protected int listenPort; protected ServerSocket serverSocket; public PooledRemoteFileServer(int aListenPort, int maxConnections) { listenPort= aListenPort; this.maxConnections = maxConnections; } public void acceptConnections() { try { ServerSocket server = new ServerSocket(listenPort, 5); Socket incomingConnection = null; while(true) { incomingConnection = server.accept(); handleConnection(incomingConnection); } } catch(BindException e) { System.out.println(""); } catch(IOException e) { System.out.println(""+listenPort); } } protected void handleConnection(Socket connectionToHandle) { PooledConnectionHandler.processRequest(connectionToHandle); } public void setUpHandlers() { for(int i=0; i<maxConnections; i++) { PooledConnectionHandler currentHandler = new PooledConnectionHandler(); new Thread(currentHandler, "Handler " + i).start(); } } public static void main(String args[]) { PooledRemoteFileServer server = new PooledRemoteFileServer(1001, 3); server.setUpHandlers(); server.acceptConnections(); } } 請注意一下您現在應該熟悉了的 import 語句。我們給類以下實例變量以保存: • 我們的服務器能同時處理的活動客戶機連接的最大數目 • 進入的連接的偵聽端口(我們沒有指定缺省值,但如果您想這樣做,并不會受到限制) • 將接受客戶機連接請求的 ServerSocket 類的構造器用的參數是偵聽端口和連接的最大數目 我們的類有一個 main() 方法和三個其它方法。稍后我們將探究這些方法的細節。現在只須知道setUpHandlers()創建數目為maxConnections的大量PooledConnectionHandler,而其它兩個方法則與我們前面已經看到的相似:acceptConnections()在ServerSocket上偵聽傳入的客戶機連接,而handleConnection則在客戶機連接一旦被建立后就實際處理它。 實現 main() 這里我們實現需作改動的main()方法,該方法將創建能夠處理給定數目的客戶機連接的PooledRemoteFileServer,并告訴它接受連接: public static void main(String args[]) { PooledRemoteFileServer server = new PooledRemoteFileServer(1001, 3); server.setUpHandlers(); server.acceptConnections(); } 我們的main()方法很簡單。我們實例化一個新的PooledRemoteFileServer,它將通過調用setUpHandlers()來建立三個PooledConnectionHandler。一旦服務器就緒,我們就告訴它acceptConnections()。 建立連接處理程序 public void setUpHandlers() { for(int i=0; i<maxConnections; i++) { PooledConnectionHandler currentHandler = new PooledConnectionHandler(); new Thread(currentHandler, "Handler " + i).start(); } } setUpHandlers()方法創建maxConnections(例如 3)個PooledConnectionHandler并在新Thread中激活它們。用實現了Runnable的對象來創建Thread使我們可以在Thread調用start()并且可以期望在Runnable上調用了run()。換句話說,我們的PooledConnectionHandler將等著處理進入的連接,每個都在它自己的Thread中進行。我們在示例中只創建三個Thread,而且一旦服務器運行,這就不能被改變。 處理連接 這里我們實現需作改動的handleConnections()方法,它將委派PooledConnectionHandler處理連接: protected void handleConnection(Socket connectionToHandle) { PooledConnectionHandler.processRequest(connectionToHandle); } 我們現在叫 PooledConnectionHandler 處理所有進入的連接(processRequest() 是一個靜態方法)。 創建 PooledRemoteFileServer 類 import java.io.*; import java.net.*; import java.util.*; public class PooledConnectionHandler implements Runnable { protected Socket connection; protected static List pool = new LinkedList(); public PooledConnectionHandler() {} public void handleConnection() { try { PrintWriter streamWriter = new PrintWriter(connection.getOutputStream()); BufferedReader streamReader = new BufferedReader(new InputStreamReader(connection.getInputStream())); String fileToRead = streamReader.readLine(); BufferedReader fileReader = new BufferedReader(new FileReader(fileToRead)); String line = null; while((line=fileReader.readLine())!=null) streamWriter.println(line); fileReader.close(); streamWriter.close(); streamReader.close(); } catch(FileNotFoundException e) { System.out.println(""); } catch(IOException e) { System.out.println(""+e); } } public static void processRequest(Socket requestToHandle) { synchronized(pool) { pool.add(pool.size(), requestToHandle); pool.notifyAll(); } } public void run() { while(true) { synchronized(pool) { while(pool.isEmpty()) { try { pool.wait(); } catch(InterruptedException e) { e.printStackTrace(); } } connection= (Socket)pool.remove(0); } handleConnection(); } } } 這個助手類與 ConnectionHandler 非常相似,但它帶有處理連接池的手段。該類有兩個實例變量: • connection 是當前正在處理的 Socket • 名為 pool 的靜態 LinkedList 保存需被處理的連接 填充連接池 這里我們實現PooledConnectionHandler上的processRequest()方法,它將把傳入請求添加到池中,并告訴其它正在等待的對象該池已經有一些內容: public static void processRequest(Socket requestToHandle) { synchronized(pool) { pool.add(pool.size(), requestToHandle); pool.notifyAll(); } } synchronized 塊是個稍微有些不同的東西。您可以同步任何對象上的一個塊,而不只是在本身的某個方法中含有該塊的對象。在我們的示例中,processRequest() 方法包含有一個 pool(請記住它是一個 LinkedList,保存等待處理的連接池)的 synchronized塊。我們這樣做的原因是確保沒有別人能跟我們同時修改連接池。 既然我們已經保證了我們是唯一“涉水”池中的人,我們就可以把傳入的Socket添加到LinkedList的尾端。一旦我們添加了新的連接,我們就用以下代碼通知其它正在等待該池的Thread,池現在已經可用: pool.notifyAll(); Object的所有子類都繼承這個notifyAll()方法。這個方法,連同我們下一屏將要討論的wait()方法一起,就使一個Thread能夠讓另一個Thread知道一些條件已經具備。這意味著該第二個Thread一定正在等待那些條件的滿足。 從池中獲取連接 這里我們實現PooledConnectionHandler上需作改動的run()方法,它將在連接池上等待,并且池中一有連接就處理它: public void run() { while(true) { synchronized(pool) { while(pool.isEmpty()) { try { pool.wait(); } catch(InterruptedException e) { e.printStackTrace(); } } connection= (Socket)pool.remove(0); } handleConnection(); } } 回想一下在前面講過的:一個Thread正在等待有人通知它連接池方面的條件已經滿足了。在我們的示例中,請記住我們有三個PooledConnectionHandler在等待使用池中的連接。每個PooledConnectionHandler都在它自已的Thread中運行,并通過調用pool.wait()產生阻塞。當我們的processRequest()在連接池上調用notifyAll()時,所有正在等待的PooledConnectionHandler都將得到“池已經可用”的通知。然后各自繼續前行調用pool.wait(),并重新檢查while(pool.isEmpty())循環條件。除了一個處理程序,其它池對所有處理程序都將是空的,因此,在調用pool.wait()時,除了一個處理程序,其它所有處理程序都將再次產生阻塞。恰巧碰上非空池的處理程序將跳出while(pool.isEmpty())循環并攫取池中的第一個連接: connection= (Socket)pool.remove(0); 處理程序一旦有一個連接可以使用,就調用 handleConnection() 處理它。 在我們的示例中,池中可能永遠不會有多個連接,只是因為事情很快就被處理掉了。如果池中有一個以上連接,那么其它處理程序將不必等待新的連接被添加到池。當它們檢查pool.isEmpty()條件時,將發現其值為假,然后就從池中攫取一個連接并處理它。 還有另一件事需注意。當run()擁有池的互斥鎖時,processRequest()如何能夠把連接放到池中呢?答案是對池上的wait()的調用釋放鎖,而wait()接著就在自己返回之前再次攫取該鎖。這就使得池對象的其它同步代碼可以獲取該鎖。 處理連接:再一次 這里我們實現需做改動的handleConnection()方法,該方法將攫取連接的流,使用它們,并在任務完成之后清除它們: public void handleConnection() { try { PrintWriter streamWriter = new PrintWriter(connection.getOutputStream()); BufferedReader streamReader = new BufferedReader(new InputStreamReader(connection.getInputStream())); String fileToRead = streamReader.readLine(); BufferedReader fileReader = new BufferedReader(new FileReader(fileToRead)); String line = null; while((line=fileReader.readLine())!=null) streamWriter.println(line); fileReader.close(); streamWriter.close(); streamReader.close(); } catch(FileNotFoundException e) { System.out.println(""); } catch(IOException e) { System.out.println(""+e); } } 跟在多線程服務器中不同,我們的PooledConnectionHandler有一個handleConnection()方法。這個方法的代碼跟非池式的ConnectionHandler上的run()方法的代碼完全一樣。首先,我們把OutputStream和InputStream分別包裝進(用Socket上的getOutputStream()和getInputStream())BufferedReader和PrintWriter。然后我們逐行讀目標文件,就象我們在多線程示例中做的那樣。再一次,我們獲取一些字節之后就把它們放到本地的line變量中,然后寫出到客戶機。完成讀寫操作之后,我們關閉FileReader和打開的流。 總結一下帶有連接池的服務器 讓我們回顧一下創建和使用“池版”服務器的步驟: 1. 創建一個新種類的連接處理程序(我們稱之為 PooledConnectionHandler)來處理池中的連接。 2. 修改服務器以創建和使用一組 PooledConnectionHandler。 Java 語言簡化了套接字在應用程序中的使用。它的基礎實際上是 java.net 包中的 Socket 和 ServerSocket 類。一旦您理解了表象背后發生的情況,就能容易地使用這些類。在現實生活中使用套接字只是這樣一件事,即通過貫徹優秀的 OO 設計原則來保護應用程序中各層間的封裝。我們為您展示了一些有幫助的類。這些類的結構對我們的應用程序隱藏了 Socket 交互作用的低級細節 ? 使應用程序能只使用可插入的 ClientSocketFacade 和 ServerSocketFacade。在有些地方(在 Facade 內),您仍然必須管理稍顯雜亂的字節細節,但您只須做一次就可以了。更好的是,您可以在將來的項目中重用這些低級別的助手類。 |