programmer's home, welcome here!

          technical issues and my life

          常用鏈接

          統計

          最新評論

          Java 通用線程池-轉自CSDN

          感謝ryang的勞動!

          Java實現通用線程池

          線程池通俗的描述就是預先創建若干空閑線程,等到需要用多線程去處理事務的時候去喚醒某些空閑線程執行處理任務,這樣就省去了頻繁創建線程的時間,因為頻繁創建線程是要耗費大量的CPU資源的。如果一個應用程序需要頻繁地處理大量并發事務,不斷的創建銷毀線程往往會大大地降低系統的效率,這時候線程池就派上用場了。
                本文旨在使用Java語言編寫一個通用的線程池。當需要使用線程池處理事務時,只需按照指定規范封裝好事務處理對象,然后用已有的線程池對象去自動選擇空閑線程自動調用事務處理對象即可。并實現線程池的動態修改(修改當前線程數,最大線程數等)。下面是實現代碼:

          //ThreadTask .java

           

          package polarman.threadpool;

          /** *//**
           * 線程任務
           * @author ryang
           * 2006-8-8
           
          */

          public 
          interface ThreadTask ...{
              
          public void run();
          }



          //PooledThread.java

          package polarman.threadpool;

          import java.util.Collection;
          import java.util.Vector;

          /** *//**
           * 接受線程池管理的線程
           * @author ryang
           * 2006-8-8
           
          */

          public 
          class PooledThread extends Thread ...{
             
              
          protected Vector tasks = new Vector();
              
          protected boolean running = false;
              
          protected boolean stopped = false;
              
          protected boolean paused = false;
              
          protected boolean killed = false;
              private ThreadPool pool;
             
              
          public PooledThread(ThreadPool pool)...{
                  
          this.pool = pool;
              }

             
              public 
          void putTask(ThreadTask task)...{
                  tasks.add(task);
              }

             
              public 
          void putTasks(ThreadTask[] tasks)...{
                  
          for(int i=0; i<tasks.length; i++)
                      
          this.tasks.add(tasks[i]);
              }

             
              public 
          void putTasks(Collection tasks)...{
                  
          this.tasks.addAll(tasks);
              }

             
              protected
           ThreadTask popTask()...{
                  
          if(tasks.size() > 0)
                      
          return (ThreadTask)tasks.remove(0);
                  else
                      return
           null;
              }

             
              public 
          boolean isRunning()...{
                  
          return running;
              }

             
              public 
          void stopTasks()...{
                  stopped 
          = true;
              }

             
              public 
          void stopTasksSync()...{
                  stopTasks();
                  
          while(isRunning())...{
                      
          try ...{
                          sleep(5);
                      }
           catch
           (InterruptedException e) ...{
                      }

                  }

              }

             
              public 
          void pauseTasks()...{
                  paused 
          = true;
              }

             
              public 
          void pauseTasksSync()...{
                  pauseTasks();
                  
          while(isRunning())...{
                      
          try ...{
                          sleep(5);
                      }
           catch
           (InterruptedException e) ...{
                      }

                  }

              }

             
              public 
          void kill()...{
                  
          if(!running)
                      interrupt();
                  else
                      killed =
           true;
              }

             
              public 
          void killSync()...{
                  kill();
                  
          while(isAlive())...{
                      
          try ...{
                          sleep(5);
                      }
           catch
           (InterruptedException e) ...{
                      }

                  }

              }

             
              public 
          synchronized void startTasks()...{
                  running 
          = true;
                  
          this.notify();
              }

             
              public 
          synchronized void run()...{
                  
          try...{
                      
          while(true)...{
                          
          if(!running || tasks.size() == 0)...{
                              pool.notifyForIdleThread();
                              
          //System.out.println(Thread.currentThread().getId() + ": 空閑");
                              this.wait();
                          }
          else
          ...{
                              ThreadTask task;
                              
          while((task = popTask()) != null)...{
                                  task.run();
                                  
          if(stopped)...{
                                      stopped 
          = false;
                                      
          if(tasks.size() > 0)...{
                                          tasks.clear();
                                          System.out.println(Thread.currentThread().getId() 
          + ": Tasks are stopped");
                                          
          break;
                                      }

                                  }

                                  if
          (paused)...{
                                      paused 
          = false;
                                      
          if(tasks.size() > 0)...{
                                          System.out.println(Thread.currentThread().getId() 
          + ": Tasks are paused");
                                          
          break;
                                      }

                                  }

                              }

                              running =
           false;
                          }


                          if
          (killed)...{
                              killed 
          = false;
                              
          break;
                          }

                      }

                  }
          catch
          (InterruptedException e)...{
                      
          return;
                  }

                 
                  //System.out.println(Thread.currentThread().getId() + ": Killed");
              }

          }



          //ThreadPool.java


          下面是線程池的測試程序
          //ThreadPoolTest.java

          ThreadPool pool = new ThreadPool(3, 2);
          pool.init();

          要處理的任務實現ThreadTask...接口即可(如測試代碼里的SimpleTask),這個接口只有一個方法run()
          兩行代碼即可調用:

          ThreadTask task = ... //實例化你的任務對象
          pool.processTask(task);

          package polarman.threadpool;

          import java.util.Collection;
          import java.util.Iterator;
          import java.util.Vector;

          /** *//**
           * 線程池
           * @author ryang
           * 2006-8-8
           
          */

          public 
          class ThreadPool ...{
             
              
          protected int maxPoolSize;
              
          protected int initPoolSize;
              
          protected Vector threads = new Vector();
              
          protected boolean initialized = false;
              
          protected boolean hasIdleThread = false;
             
              
          public ThreadPool(int maxPoolSize, int initPoolSize)...{
                  
          this.maxPoolSize = maxPoolSize;
                  
          this.initPoolSize = initPoolSize;
              }

             
              public 
          void init()...{
                  initialized 
          = true;
                  
          for(int i=0; i<initPoolSize; i++)...{
                      PooledThread thread 
          = new PooledThread(this);
                      thread.start();
                      threads.add(thread);
                  }

                 
                  //System.out.println("線程池初始化結束,線程數=" + threads.size() + " 最大線程數=" + maxPoolSize);
              }

             
              public 
          void setMaxPoolSize(int maxPoolSize)...{
                  
          //System.out.println("重設最大線程數,最大線程數=" + maxPoolSize);
                  
          this.maxPoolSize = maxPoolSize;
                  
          if(maxPoolSize < getPoolSize())
                      setPoolSize(maxPoolSize);
              }

             
              
          /**
               * 重設當前線程數
               * 若需殺掉某線程,線程不會立刻殺掉,而會等到線程中的事務處理完成
               * 但此方法會立刻從線程池中移除該線程,不會等待事務處理結束
               * @param size
               
          */

              public 
          void setPoolSize(int size)...{
                  
          if(!initialized)...{
                      initPoolSize 
          = size;
                      return;
                  }
          else 
          if(size > getPoolSize())...{
                      
          for(int i=getPoolSize(); i<size && i<maxPoolSize; i++)...{
                          PooledThread thread 
          = new PooledThread(this);
                          thread.start();
                          threads.add(thread);
                      }

                  }
          else 
          if(size < getPoolSize())...{
                      
          while(getPoolSize() > size)...{
                          PooledThread th 
          = (PooledThread)threads.remove(0);
                          th.kill();
                      }

                  }

                 
                  //System.out.println("重設線程數,線程數=" + threads.size());
              }

             
              public 
          int getPoolSize()...{
                  
          return threads.size();
              }

             
              protected 
          void notifyForIdleThread()...{
                  hasIdleThread 
          = true;
              }

             
              protected 
          boolean waitForIdleThread()...{
                  hasIdleThread 
          = false;
                  
          while(!hasIdleThread && getPoolSize() >= maxPoolSize)...{
                      
          try ...{
                          Thread.sleep(5);
                      }
           catch
           (InterruptedException e) ...{
                          
          return false;
                      }

                  }

                 
                  return
           true;
              }

             
              public 
          synchronized PooledThread getIdleThread()...{
                  
          while(true)...{
                      
          for(Iterator itr=threads.iterator(); itr.hasNext();)...{
                          PooledThread th = (PooledThread)itr.next();
                          
          if(!th.isRunning())
                              
          return th;
                      }

                     
                      if(getPoolSize() 
          < maxPoolSize)...{
                          PooledThread thread 
          = new PooledThread(this);
                          thread.start();
                          threads.add(thread);
                          
          return thread;
                      }

                     
                      //System.out.println("線程池已滿,等待...");
                      
          if(waitForIdleThread() == false)
                          
          return null;
                  }

              }

             
              public 
          void processTask(ThreadTask task)...{
                  PooledThread th = getIdleThread();
                  
          if(th != null)...{
                      th.putTask(task);
                      th.startTasks();
                  }

              }

             
              public 
          void processTasksInSingleThread(ThreadTask[] tasks)...{
                  PooledThread th = getIdleThread();
                  
          if(th != null)...{
                      th.putTasks(tasks);
                      th.startTasks();
                  }

              }

             
              public 
          void processTasksInSingleThread(Collection tasks)...{
                  PooledThread th = getIdleThread();
                  
          if(th != null)...{
                      th.putTasks(tasks);
                      th.startTasks();
                  }

              }

          }


           

          import java.io.BufferedReader;
          import java.io.IOException;
          import java.io.InputStreamReader;

          import polarman.threadpool.ThreadPool;
          import polarman.threadpool.ThreadTask;

          public class ThreadPoolTest ...{

              
          public static void main(String[] args) ...{
                  System.out.println(
          ""quit" 退出");
                  System.out.println(
          ""task A 10" 啟動任務B,時長為10秒");
                  System.out.println(
          ""size 2" 設置當前線程池大小為2");
                  System.out.println(
          ""max 3" 設置線程池最大線程數為3");
                 
          System.out.println();

                 
                  final ThreadPool pool = new ThreadPool(3, 2);
                  pool.init();
                  
                  Thread cmdThread 
          = new Thread()...{
                      
          public void run()...{
                          
                          BufferedReader reader 
          = new BufferedReader(new InputStreamReader(System.in));
                          
                          
          while(true)...{
                              
          try ...{
                                  String line = reader.readLine();
                                  String words[] 
          = line.split(" ");
                                  
          if(words[0].equalsIgnoreCase("quit"))...{
                                      System.exit(0);
                                  }
          else 
          if(words[0].equalsIgnoreCase("size"&& words.length >= 2)...{
                                      
          try...{
                                          
          int size = Integer.parseInt(words[1]);
                                          pool.setPoolSize(size);
                                      }
          catch
          (Exception e)...{
                                      }

                                  }
          else 
          if(words[0].equalsIgnoreCase("max"&& words.length >= 2)...{
                                      
          try...{
                                          
          int max = Integer.parseInt(words[1]);
                                          pool.setMaxPoolSize(max);
                                      }
          catch
          (Exception e)...{
                                      }

                                  }
          else 
          if(words[0].equalsIgnoreCase("task"&& words.length >= 3)...{
                                      
          try...{
                                          
          int timelen = Integer.parseInt(words[2]);
                                          SimpleTask task 
          = new SimpleTask(words[1], timelen * 1000);
                                          pool.processTask(task);
                                      }
          catch
          (Exception e)...{
                                      }

                                  }

                                  
                              }
           catch
           (IOException e) ...{
                                  e.printStackTrace();
                              }

                          }

                      }

                  }
          ;
                  
                  cmdThread.start();
                  
          /**//*
                  for(int i=0; i<10; i++){
                      SimpleTask task = new SimpleTask("Task" + i, (i+10)*1000);
                      pool.processTask(task);
                  }
          */

              }


          }


          class SimpleTask 
          implements ThreadTask...{
              
              private String taskName;
              
          private int timeLen;
              
              
          public SimpleTask(String taskName, int timeLen)...{
                  
          this.taskName = taskName;
                  
          this.timeLen = timeLen;
              }

              
              public 
          void run() ...{
                  System.out.println(Thread.currentThread().getId() +
                          "
          : START TASK "" + taskName + """);
                  
          try ...{
                      Thread.sleep(timeLen);
                  }
           
          catch (InterruptedException e) ...{
                  }

                  
                  System.out.println(Thread.currentThread().getId() 
          +
                          ": END TASK "
          " + taskName + """);
              }

              
          }



          使用此線程池相當簡單,下面兩行代碼初始化線程池:

          posted on 2007-04-12 23:33 crazy zerlot 閱讀(477) 評論(0)  編輯  收藏 所屬分類: J2SE & J2EE ABC

          主站蜘蛛池模板: 马山县| 刚察县| 铅山县| 新化县| 洱源县| 锡林郭勒盟| 北海市| 中江县| 上犹县| 右玉县| 临潭县| 曲靖市| 将乐县| 兰溪市| 迭部县| 灌南县| 正定县| 德化县| 巩留县| 福鼎市| 永新县| 宁晋县| 金乡县| 临沧市| 大城县| 星子县| 清原| 灌南县| 莱西市| 防城港市| 名山县| 宝鸡市| 贵南县| 亚东县| 那曲县| 平乐县| 弋阳县| 肥乡县| 岗巴县| 孟津县| 华阴市|