java線程控制器實現(轉)
java線程控制器代碼分享-根據cpu情況決定線程運行數量和情況
原文地址:請點擊標題即可。在人人網海量存儲系統的存儲引擎部分,為了提高CPU和網絡的使用情況,使用了java多線程管理并行操作的方式。
在java中控制線程是一件很簡單的事情,jdk提供了諸多的方法,其中比常用的兩個是notify()和wait(),一個是喚醒,一個等待線程,在下面的代碼中,將看到一個線程分配器,根據cpu的負載情況,自動完成對應線程的喚醒或者是等待操作。整個過程是一個平滑的過程,不會因為線程的切換而導致機器負載出線鋸齒。
先看一個類,讀取Linux系統TOP等指令拿到系統當前負載:
import java.io.BufferedReader;
import java.io.InputStreamReader;
/**
* 節點的cpu 內存 磁盤空間 情況
*
* @author zhen.chen
*
*/
public class NodeLoadView {
/**
* 獲取cpu使用情況
*
* @return
* @throws Exception
*/
public double getCpuUsage() throws Exception {
double cpuUsed = 0;
Runtime rt = Runtime.getRuntime();
Process p = rt.exec(“/usr/bin/uptime”);// 調用系統的“top”命令
String[] strArray = null;
BufferedReader in = null;
try {
in = new BufferedReader(new InputStreamReader(p.getInputStream()));
String str = null;
while ((str = in.readLine()) != null) {
strArray = str.split(“load average: “);
strArray = strArray[1].split(“,”);
cpuUsed = Double.parseDouble(strArray[0]);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
in.close();
}
return cpuUsed;
}
/**
* 內存監控
*
* @return
* @throws Exception
*/
public double getMemUsage() throws Exception {
double menUsed = 0;
Runtime rt = Runtime.getRuntime();
Process p = rt.exec(“top -b -n 1″);// 調用系統的“top”命令
BufferedReader in = null;
try {
in = new BufferedReader(new InputStreamReader(p.getInputStream()));
String str = null;
String[] strArray = null;
while ((str = in.readLine()) != null) {
int m = 0;
if (str.indexOf(” R “) != -1) {// 只分析正在運行的進程,top進程本身除外 &&
//
// System.out.println(“——————3—————–”);
strArray = str.split(” “);
for (String tmp : strArray) {
if (tmp.trim().length() == 0)
continue;
if (++m == 10) {
// 9)–第10列為mem的使用百分比(RedHat 9)
menUsed += Double.parseDouble(tmp);
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
in.close();
}
return menUsed;
}
/**
* 獲取磁盤空間大小
*
* @return
* @throws Exception
*/
public double getDeskUsage() throws Exception {
double totalHD = 0;
double usedHD = 0;
Runtime rt = Runtime.getRuntime();
Process p = rt.exec(“df -hl”);// df -hl 查看硬盤空間
BufferedReader in = null;
try {
in = new BufferedReader(new InputStreamReader(p.getInputStream()));
String str = null;
String[] strArray = null;
while ((str = in.readLine()) != null) {
int m = 0;
// if (flag > 0) {
// flag++;
strArray = str.split(” “);
for (String tmp : strArray) {
if (tmp.trim().length() == 0)
continue;
++m;
// System.out.println(“—-tmp—-” + tmp);
if (tmp.indexOf(“G”) != -1) {
if (m == 2) {
// System.out.println(“—G—-” + tmp);
if (!tmp.equals(“”) && !tmp.equals(“0″))
totalHD += Double.parseDouble(tmp.substring(0,
tmp.length() – 1)) * 1024;
}
if (m == 3) {
// System.out.println(“—G—-” + tmp);
if (!tmp.equals(“none”) && !tmp.equals(“0″))
usedHD += Double.parseDouble(tmp.substring(0,
tmp.length() – 1)) * 1024;
}
}
if (tmp.indexOf(“M”) != -1) {
if (m == 2) {
// System.out.println(“—M—” + tmp);
if (!tmp.equals(“”) && !tmp.equals(“0″))
totalHD += Double.parseDouble(tmp.substring(0,
tmp.length() – 1));
}
if (m == 3) {
// System.out.println(“—M—” + tmp);
if (!tmp.equals(“none”) && !tmp.equals(“0″))
usedHD += Double.parseDouble(tmp.substring(0,
tmp.length() – 1));
// System.out.println(“—-3—-” + usedHD);
}
}
}
// }
}
} catch (Exception e) {
e.printStackTrace();
} finally {
in.close();
}
return (usedHD / totalHD) * 100;
}
//
// public static void main(String[] args) throws Exception {
// NodeLoadView cpu = new NodeLoadView();
// System.out
// .println(“—————cpu used:” + cpu.getCpuUsage() + “%”);
// System.out
// .println(“—————mem used:” + cpu.getMemUsage() + “%”);
// System.out
// .println(“—————HD used:” + cpu.getDeskUsage() + “%”);
// System.out.println(“————jvm監控———————-”);
// Runtime lRuntime = Runtime.getRuntime();
// System.out.println(“————–Free Momery:” + lRuntime.freeMemory()
// + “K”);
// System.out.println(“————–Max Momery:” + lRuntime.maxMemory()
// + “K”);
// System.out.println(“————–Total Momery:”
// + lRuntime.totalMemory() + “K”);
// System.out.println(“—————Available Processors :”
// + lRuntime.availableProcessors());
// }
}
再來看關鍵的一個類,THreadScheduler:
import java.util.Map;
import org.apache.log4j.Logger;
import test.NodeLoadView;
public class ThreadScheduler {
private static Logger logger = Logger.getLogger(ThreadScheduler.class.getName());
private Map<String, Thread> runningThreadMap;
private Map<String, Thread> waitingThreadMap;
private boolean isFinished = false;
private int runningSize;
public ThreadScheduler (Map<String, Thread> runningThreadMap, Map<String, Thread> waitingThreadMap) {
this.runningThreadMap = runningThreadMap;
this.waitingThreadMap = waitingThreadMap;
this.runningSize = waitingThreadMap.size();
}
/**
* 開始調度線程
* @author zhen.chen
* @createTime 2010-1-28 上午11:04:52
*/
public void schedule(){
long sleepMilliSecond = 1 * 1000;
int allowRunThreads = 15;
// 一次啟動的線程數,cpuLoad變大時以此值為參考遞減
int allowRunThreadsRef = 15;
double cpuLoad = 0;// 0-15
NodeLoadView load = new NodeLoadView();
while (true) {
try {
cpuLoad = load.getCpuUsage();
} catch (Exception e1) {
e1.printStackTrace();
}
// cpuLoad低 啟動的線程多
allowRunThreads = (int) Math.floor(allowRunThreadsRef – cpuLoad);
// threads不能為0
if (allowRunThreads < 1) {
allowRunThreads = 1;
}
if (allowRunThreads > allowRunThreadsRef) {
allowRunThreads = allowRunThreadsRef;
}
if (logger.isDebugEnabled()) {
logger.debug(“[ThreadScheduler]running Thread:” + runningThreadMap.size() + “; waiting Thread:” + waitingThreadMap.size() + “; cpu:” + cpuLoad + ” allowRunThreads:” + allowRunThreads);
}
// 檢查runningSize個線程的情況,滿足條件則啟動
for (int x = 0; x < runningSize; x++) {
if (waitingThreadMap.get(x+”") != null) {
if (allowRunThreadsRef <= runningThreadMap.size()) {
break;
}
synchronized (waitingThreadMap.get(x+”")) {
if (!waitingThreadMap.get(x+”").isAlive()) {
waitingThreadMap.get(x+”").start();
}else{
waitingThreadMap.get(x+”").notify();
}
}
runningThreadMap.put(x+”", waitingThreadMap.get(x+”"));
waitingThreadMap.remove(x+”");
}
}
// 檢查runningSize個線程的情況,滿足條件則暫停
for (int x = 0; x < runningSize; x++) {
if (runningThreadMap.size() <= allowRunThreads) {
break;
}
if (runningThreadMap.get(x+”") != null) {
synchronized (runningThreadMap.get(x+”")) {
try {
if (runningThreadMap.get(x+”").isAlive()) {
runningThreadMap.get(x+”").wait();
}else{
continue;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
waitingThreadMap.put(x+”", runningThreadMap.get(x));
runningThreadMap.remove(x+”");
}
}
// 全部跑完,返回
if (waitingThreadMap.size() == 0 && runningThreadMap.size() == 0) {
if (logger.isDebugEnabled()) {
logger.debug(“[ThreadScheduler] over.total Threads size:” + runningSize);
}
this.isFinished = true;
return;
}
// 使主while循環慢一點
try {
Thread.sleep(sleepMilliSecond);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
public boolean isFinished() {
return isFinished;
}
}
這個類的作用:
1.接收runningThreadMap和waitingThreadMap兩個map,里面對應存了運行中的線程實例和等待中的線程實例。
2.讀cpu情況,自動判斷要notify等待中的線程還是wait運行中的線程。
3.兩個map都結束,退出。(必須runningThreadMap內部的Thread自己將runningThreadMap對應的Thread remove掉)
如何使用:
public class TestThread {
public static class Runner extends Thread {
public Runner(int j, Map<String, Thread> threadMap) {
}
public void run() {
// TODO 你的邏輯 完成后需要從threadMap中remove掉
}
}
public static void main(String[] args) {
// 運行中的線程
Map<String, Thread> threadMap = new HashMap<String, Thread>();
// 正在等待中的線程
Map<String, Thread> waitThreadMap = new HashMap<String, Thread>();
for (int j = 0; j < args.length; j++) {
Thread t = new Runner(j, threadMap);
waitThreadMap.put(j + “”, t);
}
ThreadScheduler threadScheduler = new ThreadScheduler(threadMap, waitThreadMap);
threadScheduler.schedule();
if (threadScheduler.isFinished() == false) {
//沒能正常結束
}
}
}
posted on 2010-08-18 07:59 都市淘沙者 閱讀(1157) 評論(0) 編輯 收藏 所屬分類: 多線程并發編程