Vincent.Chan‘s Blog

          常用鏈接

          統(tǒng)計(jì)

          積分與排名

          網(wǎng)站

          最新評(píng)論

          Java5 多線程實(shí)踐

          文檔選項(xiàng)
          將此頁作為電子郵件發(fā)送

          將此頁作為電子郵件發(fā)送

          未顯示需要 JavaScript 的文檔選項(xiàng)


          對(duì)此頁的評(píng)價(jià)

          幫助我們改進(jìn)這些內(nèi)容


          張黃矚 , 軟件工程師

          2006 年 1 月 18 日

          Java5 增加了新的類庫并發(fā)集java.util.concurrent,該類庫為并發(fā)程序提供了豐富的API多線程編程在Java 5中更加容易,靈活。本文通過一個(gè)網(wǎng)絡(luò)服務(wù)器模型,來實(shí)踐Java5的多線程編程,該模型中使用了Java5中的線程池,阻塞隊(duì)列,可重入鎖等,還實(shí)踐了 Callable, Future等接口,并使用了Java 5的另外一個(gè)新特性泛型。

          簡介

          本文將實(shí)現(xiàn)一個(gè)網(wǎng)絡(luò)服務(wù)器模型,一旦有客戶端連接到該服務(wù)器,則啟動(dòng)一個(gè)新線程為該連接服務(wù),服務(wù)內(nèi)容為往客戶端輸送一些字符信息。一個(gè)典型的網(wǎng)絡(luò)服務(wù)器模型如下:

          1. 建立監(jiān)聽端口。

          2. 發(fā)現(xiàn)有新連接,接受連接,啟動(dòng)線程,執(zhí)行服務(wù)線程。

          3. 服務(wù)完畢,關(guān)閉線程。

          這 個(gè)模型在大部分情況下運(yùn)行良好,但是需要頻繁的處理用戶請(qǐng)求而每次請(qǐng)求需要的服務(wù)又是簡短的時(shí)候,系統(tǒng)會(huì)將大量的時(shí)間花費(fèi)在線程的創(chuàng)建銷毀。Java 5的線程池克服了這些缺點(diǎn)。通過對(duì)重用線程來執(zhí)行多個(gè)任務(wù),避免了頻繁線程的創(chuàng)建與銷毀開銷,使得服務(wù)器的性能方面得到很大提高。因此,本文的網(wǎng)絡(luò)服務(wù)器 模型將如下:

          1. 建立監(jiān)聽端口,創(chuàng)建線程池。

          2. 發(fā)現(xiàn)有新連接,使用線程池來執(zhí)行服務(wù)任務(wù)。

          3. 服務(wù)完畢,釋放線程到線程池。

          下面詳細(xì)介紹如何使用Java 5的concurrent包提供的API來實(shí)現(xiàn)該服務(wù)器。



          回頁首


          初始化

          初 始化包括創(chuàng)建線程池以及初始化監(jiān)聽端口。 創(chuàng)建線程池可以通過調(diào)用java.util.concurrent.Executors類里的靜態(tài)方法newChahedThreadPool或是 newFixedThreadPool來創(chuàng)建,也可以通過新建一個(gè)java.util.concurrent.ThreadPoolExecutor實(shí)例 來執(zhí)行任務(wù)。這里我們采用newFixedThreadPool方法來建立線程池。

          ExecutorService pool = Executors.newFixedThreadPool(10);
          表示新建了一個(gè)線程池,線程池里面有10個(gè)線程為任務(wù)隊(duì)列服務(wù)。

          使用ServerSocket對(duì)象來初始化監(jiān)聽端口。




          private static final int PORT = 19527;
          serverListenSocket = new ServerSocket(PORT);
          serverListenSocket.setReuseAddress(true);
          serverListenSocket.setReuseAddress(true);



          回頁首


          服務(wù)新連接

          當(dāng)有新連接建立時(shí),accept返回時(shí),將服務(wù)任務(wù)提交給線程池執(zhí)行。





          while(true){
          Socket socket = serverListenSocket.accept();
          pool.execute(new ServiceThread(socket));

          }

          這里使用線程池對(duì)象來執(zhí)行線程,減少了每次線程創(chuàng)建和銷毀的開銷。任務(wù)執(zhí)行完畢,線程釋放到線程池。



          回頁首


          服務(wù)任務(wù)

          服 務(wù)線程ServiceThread維護(hù)一個(gè)count來記錄服務(wù)線程被調(diào)用的次數(shù)。每當(dāng)服務(wù)任務(wù)被調(diào)用一次時(shí),count的值自增1,因此 ServiceThread提供一個(gè)increaseCount和getCount的方法,分別將count值自增1和取得該count值。由于可能多個(gè) 線程存在競爭,同時(shí)訪問count,因此需要加鎖機(jī)制,在Java 5之前,我們只能使用synchronized來鎖定。Java 5中引入了性能更加粒度更細(xì)的重入鎖ReentrantLock。我們使用ReentrantLock保證代碼線程安全。下面是具體代碼:





          private static ReentrantLock lock = new ReentrantLock ();
          private static int count = 0;
          private int getCount(){
          int ret = 0;
          try{
          lock.lock();
          ret = count;
          }finally{
          lock.unlock();
          }
          return ret;
          }
          private void increaseCount(){
          try{
          lock.lock();
          ++count;
          }finally{
          lock.unlock();
          }
          }

          服務(wù)線程在開始給客戶端打印一個(gè)歡迎信息,





          increaseCount();
          int curCount = getCount();
          helloString = "hello, id = " + curCount+"\r\n";
          dos = new DataOutputStream(connectedSocket.getOutputStream());
          dos.write(helloString.getBytes());

          然后使用ExecutorService的submit 方法提交一個(gè)Callable的任務(wù),返回一個(gè)Future接口的引用。這種做法對(duì)費(fèi)時(shí)的任務(wù)非常有效,submit任務(wù)之后可以繼續(xù)執(zhí)行下面的代碼,然 后在適當(dāng)?shù)奈恢每梢允褂肍uture的get方法來獲取結(jié)果,如果這時(shí)候該方法已經(jīng)執(zhí)行完畢,則無需等待即可獲得結(jié)果,如果還在執(zhí)行,則等待到運(yùn)行完畢。





          ExecutorService executor = Executors.newSingleThreadExecutor();
          Future <String> future = executor.submit(new TimeConsumingTask());
          dos.write("let's do soemthing other".getBytes());
          String result = future.get();
          dos.write(result.getBytes());
          其中TimeConsumingTask實(shí)現(xiàn)了Callable接口
          class TimeConsumingTask implements Callable <String>{
          public String call() throws Exception {
          System.out.println
          ("It's a time-consuming task,
          you'd better retrieve your result in the furture");
          return "ok, here's the result: It takes me lots of time to produce this result";
          }
          }

          這里使用了Java 5的另外一個(gè)新特性泛型,聲明TimeConsumingTask的時(shí)候使用了String做為類型參數(shù)。必須實(shí)現(xiàn)Callable接口的call函數(shù), 其作用類似與Runnable中的run函數(shù),在call函數(shù)里寫入要執(zhí)行的代碼,其返回值類型等同于在類聲明中傳入的類型值。在這段程序中,我們提交了 一個(gè)Callable的任務(wù),然后程序不會(huì)堵塞,而是繼續(xù)執(zhí)行dos.write("let's do soemthing other".getBytes());當(dāng)程序執(zhí)行到String result = future.get()時(shí)如果call函數(shù)已經(jīng)執(zhí)行完畢,則取得返回值,如果還在執(zhí)行,則等待其執(zhí)行完畢。



          回頁首


          服務(wù)器端的完整實(shí)現(xiàn)

          服務(wù)器端的完整實(shí)現(xiàn)代碼如下:





          package com.andrew;

          import java.io.DataOutputStream;
          import java.io.IOException;
          import java.io.Serializable;
          import java.net.ServerSocket;
          import java.net.Socket;
          import java.util.concurrent.ArrayBlockingQueue;
          import java.util.concurrent.BlockingQueue;
          import java.util.concurrent.Callable;
          import java.util.concurrent.ExecutionException;
          import java.util.concurrent.ExecutorService;
          import java.util.concurrent.Executors;
          import java.util.concurrent.Future;
          import java.util.concurrent.RejectedExecutionHandler;
          import java.util.concurrent.ThreadPoolExecutor;
          import java.util.concurrent.TimeUnit;
          import java.util.concurrent.locks.ReentrantLock;

          public class Server {

          private static int produceTaskSleepTime = 100;

          private static int consumeTaskSleepTime = 1200;

          private static int produceTaskMaxNumber = 100;

          private static final int CORE_POOL_SIZE = 2;

          private static final int MAX_POOL_SIZE = 100;

          private static final int KEEPALIVE_TIME = 3;

          private static final int QUEUE_CAPACITY = (CORE_POOL_SIZE + MAX_POOL_SIZE) / 2;

          private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;

          private static final String HOST = "127.0.0.1";

          private static final int PORT = 19527;

          private BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(
          QUEUE_CAPACITY);

          //private ThreadPoolExecutor serverThreadPool = null;

          private ExecutorService pool = null;

          private RejectedExecutionHandler rejectedExecutionHandler = new
          ThreadPoolExecutor.DiscardOldestPolicy();

          private ServerSocket serverListenSocket = null;

          private int times = 5;

          public void start() {
          // You can also init thread pool in this way.
          /*serverThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
          MAX_POOL_SIZE, KEEPALIVE_TIME, TIME_UNIT, workQueue,
          rejectedExecutionHandler);*/
          pool = Executors.newFixedThreadPool(10);
          try {
          serverListenSocket = new ServerSocket(PORT);
          serverListenSocket.setReuseAddress(true);

          System.out.println("I'm listening");
          while (times-- > 0) {
          Socket socket = serverListenSocket.accept();
          String welcomeString = "hello";
          //serverThreadPool.execute(new ServiceThread(socket, welcomeString));
          pool.execute(new ServiceThread(socket));
          }
          } catch (IOException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
          }
          cleanup();
          }

          public void cleanup() {
          if (null != serverListenSocket) {
          try {
          serverListenSocket.close();
          } catch (IOException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
          }
          }
          //serverThreadPool.shutdown();
          pool.shutdown();
          }

          public static void main(String args[]) {
          Server server = new Server();
          server.start();
          }
          }

          class ServiceThread implements Runnable, Serializable {
          private static final long serialVersionUID = 0;

          private Socket connectedSocket = null;

          private String helloString = null;

          private static int count = 0;

          private static ReentrantLock lock = new ReentrantLock();

          ServiceThread(Socket socket) {
          connectedSocket = socket;
          }

          public void run() {
          increaseCount();
          int curCount = getCount();
          helloString = "hello, id = " + curCount + "\r\n";

          ExecutorService executor = Executors.newSingleThreadExecutor();
          Future<String> future = executor.submit(new TimeConsumingTask());

          DataOutputStream dos = null;
          try {
          dos = new DataOutputStream(connectedSocket.getOutputStream());
          dos.write(helloString.getBytes());
          try {
          dos.write("let's do soemthing other.\r\n".getBytes());
          String result = future.get();
          dos.write(result.getBytes());
          } catch (InterruptedException e) {
          e.printStackTrace();
          } catch (ExecutionException e) {
          e.printStackTrace();
          }
          } catch (IOException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
          } finally {
          if (null != connectedSocket) {
          try {
          connectedSocket.close();
          } catch (IOException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
          }
          }
          if (null != dos) {
          try {
          dos.close();
          } catch (IOException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
          }
          }
          executor.shutdown();
          }
          }

          private int getCount() {
          int ret = 0;
          try {
          lock.lock();
          ret = count;
          } finally {
          lock.unlock();
          }
          return ret;
          }

          private void increaseCount() {
          try {
          lock.lock();
          ++count;
          } finally {
          lock.unlock();
          }
          }
          }

          class TimeConsumingTask implements Callable<String> {
          public String call() throws Exception {
          System.out
          .println("It's a time-consuming task,
          you'd better retrieve your result in the furture");
          return "ok, here's the result: It takes me lots of time to produce this result";
          }

          }



          回頁首


          運(yùn)行程序

          運(yùn)行服務(wù)端,客戶端只需使用telnet 127.0.0.1 19527 即可看到信息如下:




          回頁首




          回頁首


          關(guān)于作者


          張黃矚 聯(lián)系方式zhanghuangzhu@gmail.com,熟悉 WBI Server Foundation, WPS 6.0,對(duì)Java,C/C++編程有濃厚的興趣。

          posted on 2006-02-18 15:05 Vincent.Chen 閱讀(129) 評(píng)論(0)  編輯  收藏 所屬分類: Java

          主站蜘蛛池模板: 同心县| 涟源市| 通许县| 南宁市| 石屏县| 永安市| 秦安县| 宣汉县| 柞水县| 德清县| 商河县| 黄冈市| 封丘县| 琼海市| 博罗县| 津南区| 丰原市| 巫山县| 婺源县| 商都县| 宝丰县| 常德市| 义乌市| 晋城| 乐都县| 达尔| 宜兰市| 塔河县| 酒泉市| 从化市| 高唐县| 莱州市| 三江| 延寿县| 驻马店市| 丰台区| 海淀区| 和林格尔县| 保亭| 南开区| 泽库县|