Dict.CN 在線詞典, 英語學習, 在線翻譯

          都市淘沙者

          荔枝FM Everyone can be host

          統計

          留言簿(23)

          積分與排名

          優秀學習網站

          友情連接

          閱讀排行榜

          評論排行榜

          java線程控制器實現(轉)

          java線程控制器代碼分享-根據cpu情況決定線程運行數量和情況

          原文地址:請點擊標題即可。


          在人人網海量存儲系統的存儲引擎部分,為了提高CPU和網絡的使用情況,使用了java多線程管理并行操作的方式。

          在java中控制線程是一件很簡單的事情,jdk提供了諸多的方法,其中比常用的兩個是notify()和wait(),一個是喚醒,一個等待線程,在下面的代碼中,將看到一個線程分配器,根據cpu的負載情況,自動完成對應線程的喚醒或者是等待操作。整個過程是一個平滑的過程,不會因為線程的切換而導致機器負載出線鋸齒。

          先看一個類,讀取Linux系統TOP等指令拿到系統當前負載:

              
          import java.io.BufferedReader;
              
          import java.io.InputStreamReader;

              
          /**
              * 節點的cpu 內存 磁盤空間 情況
              *
              * 
          @author zhen.chen
              *
              
          */
              
          public class NodeLoadView {

              
          /**
              * 獲取cpu使用情況
              *
              * 
          @return
              * 
          @throws Exception
              
          */
              
          public double getCpuUsage() throws Exception {
              
          double cpuUsed = 0;

              Runtime rt 
          = Runtime.getRuntime();
              Process p 
          = rt.exec(“/usr/bin/uptime”);// 調用系統的“top”命令
              String[] strArray = null;
              BufferedReader in 
          = null;
              
          try {
              in 
          = new BufferedReader(new InputStreamReader(p.getInputStream()));
              String str 
          = null;
              
          while ((str = in.readLine()) != null) {
              strArray 
          = str.split(“load average: “);
              strArray 
          = strArray[1].split(“,”);
              cpuUsed 
          = Double.parseDouble(strArray[0]);
              }
              } 
          catch (Exception e) {
              e.printStackTrace();
              } 
          finally {
              in.close();
              }
              
          return cpuUsed;
              }

              
          /**
              * 內存監控
              *
              * 
          @return
              * 
          @throws Exception
              
          */
              
          public double getMemUsage() throws Exception {

              
          double menUsed = 0;
              Runtime rt 
          = Runtime.getRuntime();
              Process p 
          = rt.exec(“top --1″);// 調用系統的“top”命令

              BufferedReader in 
          = null;
              
          try {
              in 
          = new BufferedReader(new InputStreamReader(p.getInputStream()));
              String str 
          = null;
              String[] strArray 
          = null;

              
          while ((str = in.readLine()) != null) {
              
          int m = 0;

              
          if (str.indexOf(” R “) != -1) {// 只分析正在運行的進程,top進程本身除外 &&
              
          //
              
          // System.out.println(“——————3—————–”);
              strArray = str.split(” “);
              
          for (String tmp : strArray) {
              
          if (tmp.trim().length() == 0)
              
          continue;

              
          if (++== 10) {
              
          // 9)–第10列為mem的使用百分比(RedHat 9)

              menUsed 
          += Double.parseDouble(tmp);

              }
              }

              }
              }
              } 
          catch (Exception e) {
              e.printStackTrace();
              } 
          finally {
              in.close();
              }
              
          return menUsed;
              }

              
          /**
              * 獲取磁盤空間大小
              *
              * 
          @return
              * 
          @throws Exception
              
          */
              
          public double getDeskUsage() throws Exception {
              
          double totalHD = 0;
              
          double usedHD = 0;
              Runtime rt 
          = Runtime.getRuntime();
              Process p 
          = rt.exec(“df -hl”);// df -hl 查看硬盤空間

              BufferedReader in 
          = null;
              
          try {
              in 
          = new BufferedReader(new InputStreamReader(p.getInputStream()));
              String str 
          = null;
              String[] strArray 
          = null;
              
          while ((str = in.readLine()) != null) {
              
          int m = 0;
              
          // if (flag > 0) {
              
          // flag++;
              strArray = str.split(” “);
              
          for (String tmp : strArray) {
              
          if (tmp.trim().length() == 0)
              
          continue;
              
          ++m;
              
          // System.out.println(“—-tmp—-” + tmp);
              if (tmp.indexOf(“G”) != -1) {
              
          if (m == 2) {
              
          // System.out.println(“—G—-” + tmp);
              if (!tmp.equals(“”) && !tmp.equals(“0″))
              totalHD 
          += Double.parseDouble(tmp.substring(0,
              tmp.length() – 
          1)) * 1024;

              }
              
          if (m == 3) {
              
          // System.out.println(“—G—-” + tmp);
              if (!tmp.equals(“none”) && !tmp.equals(“0″))
              usedHD 
          += Double.parseDouble(tmp.substring(0,
              tmp.length() – 
          1)) * 1024;

              }
              }
              
          if (tmp.indexOf(“M”) != -1) {
              
          if (m == 2) {
              
          // System.out.println(“—M—” + tmp);
              if (!tmp.equals(“”) && !tmp.equals(“0″))
              totalHD 
          += Double.parseDouble(tmp.substring(0,
              tmp.length() – 
          1));

              }
              
          if (m == 3) {
              
          // System.out.println(“—M—” + tmp);
              if (!tmp.equals(“none”) && !tmp.equals(“0″))
              usedHD 
          += Double.parseDouble(tmp.substring(0,
              tmp.length() – 
          1));
              
          // System.out.println(“—-3—-” + usedHD);
              }
              }

              }

              
          // }
              }
              } 
          catch (Exception e) {
              e.printStackTrace();
              } 
          finally {
              in.close();
              }
              
          return (usedHD / totalHD) * 100;
              }
              
          //
              
          //    public static void main(String[] args) throws Exception {
              
          //        NodeLoadView cpu = new NodeLoadView();
              
          //        System.out
              
          //                .println(“—————cpu used:” + cpu.getCpuUsage() + “%”);
              
          //        System.out
              
          //                .println(“—————mem used:” + cpu.getMemUsage() + “%”);
              
          //        System.out
              
          //                .println(“—————HD used:” + cpu.getDeskUsage() + “%”);
              
          //        System.out.println(“————jvm監控———————-”);
              
          //        Runtime lRuntime = Runtime.getRuntime();
              
          //        System.out.println(“————–Free Momery:” + lRuntime.freeMemory()
              
          //                + “K”);
              
          //        System.out.println(“————–Max Momery:” + lRuntime.maxMemory()
              
          //                + “K”);
              
          //        System.out.println(“————–Total Momery:”
              
          //                + lRuntime.totalMemory() + “K”);
              
          //        System.out.println(“—————Available Processors :”
              
          //                + lRuntime.availableProcessors());
              
          //    }
              }

          再來看關鍵的一個類,THreadScheduler:

              
          import java.util.Map;

              
          import org.apache.log4j.Logger;

              
          import test.NodeLoadView;
              
          public class ThreadScheduler {
              
          private static Logger logger = Logger.getLogger(ThreadScheduler.class.getName());
              
          private Map<String, Thread> runningThreadMap;
              
          private Map<String, Thread> waitingThreadMap;
              
          private boolean isFinished = false;
              
          private int runningSize;

              
          public ThreadScheduler (Map<String, Thread> runningThreadMap, Map<String, Thread> waitingThreadMap) {
              
          this.runningThreadMap = runningThreadMap;
              
          this.waitingThreadMap = waitingThreadMap;
              
          this.runningSize = waitingThreadMap.size();
              }

              
          /**
              * 開始調度線程
              * 
          @author zhen.chen
              * @createTime 2010-1-28 上午11:04:52
              
          */
              
          public void schedule(){
              
          long sleepMilliSecond = 1 * 1000;
              
          int allowRunThreads = 15;
              
          // 一次啟動的線程數,cpuLoad變大時以此值為參考遞減
              int allowRunThreadsRef = 15;
              
          double cpuLoad = 0;// 0-15
              NodeLoadView load = new NodeLoadView();

              
          while (true) {
              
          try {
              cpuLoad 
          = load.getCpuUsage();
              } 
          catch (Exception e1) {
              e1.printStackTrace();
              }
              
          // cpuLoad低 啟動的線程多
              allowRunThreads = (int) Math.floor(allowRunThreadsRef – cpuLoad);
              
          // threads不能為0
              if (allowRunThreads < 1) {
              allowRunThreads 
          = 1;
              }
              
          if (allowRunThreads > allowRunThreadsRef) {
              allowRunThreads 
          = allowRunThreadsRef;
              }
              
          if (logger.isDebugEnabled()) {
              logger.debug(“[ThreadScheduler]running Thread:” 
          + runningThreadMap.size() + “; waiting Thread:” + waitingThreadMap.size() + “; cpu:” + cpuLoad + ” allowRunThreads:” + allowRunThreads);
              }

              
          // 檢查runningSize個線程的情況,滿足條件則啟動
              for (int x = 0; x < runningSize; x++) {
              
          if (waitingThreadMap.get(x+") != null) {
              if (allowRunThreadsRef <= runningThreadMap.size()) {
              
          break;
              }
              
          synchronized (waitingThreadMap.get(x+")) {
              if (!waitingThreadMap.get(x+").isAlive()) {
              waitingThreadMap.get(x+").start();
              }else{
              waitingThreadMap.get(x
          +").notify();
              }
              }
              runningThreadMap.put(x
          +", waitingThreadMap.get(x+”"));
              waitingThreadMap.remove(x
          +");
              }
              }
              
          // 檢查runningSize個線程的情況,滿足條件則暫停
              for (int x = 0; x < runningSize; x++) {
              
          if (runningThreadMap.size() <= allowRunThreads) {
              
          break;
              }
              
          if (runningThreadMap.get(x+") != null) {
              synchronized (runningThreadMap.get(x+")) {
              try {
              
          if (runningThreadMap.get(x+").isAlive()) {
              runningThreadMap.get(x+").wait();
              }else{
              
          continue;
              }
              } 
          catch (InterruptedException e) {
              e.printStackTrace();
              }
              }
              waitingThreadMap.put(x
          +", runningThreadMap.get(x));
              runningThreadMap.remove(x+");
              }
              }
              
          // 全部跑完,返回
              if (waitingThreadMap.size() == 0 && runningThreadMap.size() == 0) {
              
          if (logger.isDebugEnabled()) {
              logger.debug(“[ThreadScheduler] over.total Threads size:” 
          + runningSize);
              }
              
          this.isFinished = true;
              
          return;
              }
              
          // 使主while循環慢一點
              try {
              Thread.sleep(sleepMilliSecond);
              } 
          catch (InterruptedException e1) {
              e1.printStackTrace();
              }
              }

              }

              
          public boolean isFinished() {
              
          return isFinished;
              }
              }

          這個類的作用:

          1.接收runningThreadMap和waitingThreadMap兩個map,里面對應存了運行中的線程實例和等待中的線程實例。

          2.讀cpu情況,自動判斷要notify等待中的線程還是wait運行中的線程。

          3.兩個map都結束,退出。(必須runningThreadMap內部的Thread自己將runningThreadMap對應的Thread remove掉)

          如何使用:

              
          public class TestThread {
              
          public static class Runner extends Thread {
              
          public Runner(int j, Map<String, Thread> threadMap) {

              }
              
          public void run() {
              
          // TODO 你的邏輯 完成后需要從threadMap中remove掉
              }
              }

              
          public static void main(String[] args) {
              
          // 運行中的線程
              Map<String, Thread> threadMap = new HashMap<String, Thread>();
              
          // 正在等待中的線程
              Map<String, Thread> waitThreadMap = new HashMap<String, Thread>();
              
          for (int j = 0; j < args.length; j++) {
              Thread t 
          = new Runner(j, threadMap);
              waitThreadMap.put(j 
          + “”, t);
              }

              ThreadScheduler threadScheduler 
          = new ThreadScheduler(threadMap, waitThreadMap);
              threadScheduler.schedule();
              
          if (threadScheduler.isFinished() == false) {
              
          //沒能正常結束
              }
              }
              }


          posted on 2010-08-18 07:59 都市淘沙者 閱讀(1157) 評論(0)  編輯  收藏 所屬分類: 多線程并發編程

          主站蜘蛛池模板: 永新县| 宜兴市| 新乡市| 嘉鱼县| 镇宁| 屯留县| 精河县| 磴口县| 长岛县| 马关县| 云林县| 额济纳旗| 金乡县| 深水埗区| 辽阳市| 淮北市| 温州市| 沙田区| 和静县| 惠东县| 新安县| 抚顺县| 绥芬河市| 建昌县| 资中县| 辽阳县| 同仁县| 遂溪县| 固安县| 理塘县| 洛浦县| 沾化县| 泗阳县| 绵阳市| 南京市| 浦北县| 贵阳市| 那曲县| 当雄县| 凤翔县| 北川|