/**
* author:annegu
* date:2009-07-16
*/
annegu做了一個(gè)簡單的Http多線程的下載程序,來討論一下多線程并發(fā)下載以及斷點(diǎn)續(xù)傳的問題。
這個(gè)程序的功能,就是可以分多個(gè)線程從目標(biāo)地址上下載數(shù)據(jù),每個(gè)線程負(fù)責(zé)下載一部分,并可以支持?jǐn)帱c(diǎn)續(xù)傳和超時(shí)重連。
下載的方法是download(),它接收兩個(gè)參數(shù),分別是要下載的頁面的url和編碼方式。在這個(gè)負(fù)責(zé)下載的方法中,主要分了三個(gè)步驟。第一步是用來設(shè)置斷點(diǎn)續(xù)傳時(shí)候的一些信息的,第二步就是主要的分多線程來下載了,最后是數(shù)據(jù)的合并。
1、多線程下載:
/** http://www.bt285.cn http://www.5a520.cn
*/
public String download(String urlStr, String charset) {
this.charset = charset;
long contentLength = 0;
CountDownLatch latch = new CountDownLatch(threadNum);
long[] startPos = new long[threadNum];
long endPos = 0;
try {
// 從url中獲得下載的文件格式與名字
this.fileName = urlStr.substring(urlStr.lastIndexOf("/") + 1);
this.url = new URL(urlStr);
URLConnection con = url.openConnection();
setHeader(con);
// 得到content的長度
contentLength = con.getContentLength();
// 把context分為threadNum段的話,每段的長度。
this.threadLength = contentLength / threadNum;
// 第一步,分析已下載的臨時(shí)文件,設(shè)置斷點(diǎn),如果是新的下載任務(wù),則建立目標(biāo)文件。在第4點(diǎn)中說明。
startPos = setThreadBreakpoint(fileDir, fileName, contentLength, startPos);
//第二步,分多個(gè)線程下載文件
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < threadNum; i++) {
// 創(chuàng)建子線程來負(fù)責(zé)下載數(shù)據(jù),每段數(shù)據(jù)的起始位置為(threadLength * i + 已下載長度)
startPos[i] += threadLength * i;
/*設(shè)置子線程的終止位置,非最后一個(gè)線程即為(threadLength * (i + 1) - 1)
最后一個(gè)線程的終止位置即為下載內(nèi)容的長度*/
if (i == threadNum - 1) {
endPos = contentLength;
} else {
endPos = threadLength * (i + 1) - 1;
}
// 開啟子線程,并執(zhí)行。
ChildThread thread = new ChildThread(this, latch, i, startPos[i], endPos);
childThreads[i] = thread;
exec.execute(thread);
}
try {
// 等待CountdownLatch信號為0,表示所有子線程都結(jié)束。
latch.await();
exec.shutdown();
// 第三步,把分段下載下來的臨時(shí)文件中的內(nèi)容寫入目標(biāo)文件中。在第3點(diǎn)中說明。
tempFileToTargetFile(childThreads);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
首先來看最主要的步驟:多線程下載。
首先從url中提取目標(biāo)文件的名稱,并在對應(yīng)的目錄創(chuàng)建文件。然后取得要下載的文件大小,根據(jù)分成的下載線程數(shù)量平均分配每個(gè)線程需要下載的數(shù)據(jù)量,就是threadLength。然后就可以分多個(gè)線程來進(jìn)行下載任務(wù)了。
在這個(gè)例子中,并沒有直接顯示的創(chuàng)建Thread對象,而是用Executor來管理Thread對象,并且用CachedThreadPool來創(chuàng)建的線程池,當(dāng)然也可以用FixedThreadPool。CachedThreadPool在程序執(zhí)行的過程中會(huì)創(chuàng)建與所需數(shù)量相同的線程,當(dāng)程序回收舊線程的時(shí)候就停止創(chuàng)建新線程。FixedThreadPool可以預(yù)先新建參數(shù)給定個(gè)數(shù)的線程,這樣就不用在創(chuàng)建任務(wù)的時(shí)候再來創(chuàng)建線程了,可以直接從線程池中取出已準(zhǔn)備好的線程。下載線程的數(shù)量是通過一個(gè)全局變量threadNum來控制的,默認(rèn)為5。
好了,這5個(gè)子線程已經(jīng)通過Executor來創(chuàng)建了,下面它們就會(huì)各自為政,互不干涉的執(zhí)行了。線程有兩種實(shí)現(xiàn)方式:實(shí)現(xiàn)Runnable接口;繼承Thread類。
ChildThread就是子線程,它作為DownloadTask的內(nèi)部類,繼承了Thread,它的構(gòu)造方法需要5個(gè)參數(shù),依次是一個(gè)對DownloadTask的引用,一個(gè)CountDownLatch,id(標(biāo)識線程的id號),startPosition(下載內(nèi)容的開始位置),endPosition(下載內(nèi)容的結(jié)束位置)。
這個(gè)CountDownLatch是做什么用的呢?
現(xiàn)在我們整理一下思路,要實(shí)現(xiàn)分多個(gè)線程來下載數(shù)據(jù)的話,我們肯定還要把這多個(gè)線程下載下來的數(shù)據(jù)進(jìn)行合。主線程必須等待所有的子線程都執(zhí)行結(jié)束之后,才能把所有子線程的下載數(shù)據(jù)按照各自的id順序進(jìn)行合并。CountDownLatch就是來做這個(gè)工作的。
CountDownLatch用來同步主線程,強(qiáng)制主線程等待所有的子線程執(zhí)行的下載操作完成。在主線程中,CountDownLatch對象被設(shè)置了一個(gè)初始計(jì)數(shù)器,就是子線程的個(gè)數(shù)5個(gè),代碼①處。在新建了5個(gè)子線程并開始執(zhí)行之后,主線程用CountDownLatch的await()方法來阻塞主線程,直到這個(gè)計(jì)數(shù)器的值到達(dá)0,才會(huì)進(jìn)行下面的操作,代碼②處。
對每個(gè)子線程來說,在執(zhí)行完下載指定區(qū)間與長度的數(shù)據(jù)之后,必須通過調(diào)用CountDownLatch的countDown()方法來把這個(gè)計(jì)數(shù)器減1。
2、在全面開啟下載任務(wù)之后,主線程就開始阻塞,等待子線程執(zhí)行完畢,所以下面我們來看一下具體的下載線程ChildThread。
/**
*author by http://www.5a520.cn http://www.feng123.com
*/
public class ChildThread extends Thread {
public static final int STATUS_HASNOT_FINISHED = 0;
public static final int STATUS_HAS_FINISHED = 1;
public static final int STATUS_HTTPSTATUS_ERROR = 2;
private DownloadTask task;
private int id;
private long startPosition;
private long endPosition;
private final CountDownLatch latch;
private File tempFile = null;
//線程狀態(tài)碼
private int status = ChildThread.STATUS_HASNOT_FINISHED;
public ChildThread(DownloadTask task, CountDownLatch latch, int id, long startPos, long endPos) {
super();
this.task = task;
this.id = id;
this.startPosition = startPos;
this.endPosition = endPos;
this.latch = latch;
try {
tempFile = new File(this.task.fileDir + this.task.fileName + "_" + id);
if(!tempFile.exists()){
tempFile.createNewFile();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
System.out.println("Thread " + id + " run
");
HttpURLConnection con = null;
InputStream inputStream = null;
BufferedOutputStream outputStream = null;
int count = 0;
long threadDownloadLength = endPosition - startPosition;
try {
outputStream = new BufferedOutputStream(new FileOutputStream(tempFile.getPath(), true));
} catch (FileNotFoundException e2) {
e2.printStackTrace();
}
③ for(;;){
④ startPosition += count;
try {
//打開URLConnection
con = (HttpURLConnection) task.url.openConnection();
setHeader(con);
con.setAllowUserInteraction(true);
//設(shè)置連接超時(shí)時(shí)間為10000ms
⑤ con.setConnectTimeout(10000);
//設(shè)置讀取數(shù)據(jù)超時(shí)時(shí)間為10000ms
con.setReadTimeout(10000);
if(startPosition < endPosition){
//設(shè)置下載數(shù)據(jù)的起止區(qū)間
con.setRequestProperty("Range", "bytes=" + startPosition + "-"
+ endPosition);
System.out.println("Thread " + id + " startPosition is " + startPosition);
System.out.println("Thread " + id + " endPosition is " + endPosition);
//判斷http status是否為HTTP/1.1 206 Partial Content或者200 OK
//如果不是以上兩種狀態(tài),把status改為STATUS_HTTPSTATUS_ERROR
⑥ if (con.getResponseCode() != HttpURLConnection.HTTP_OK
&& con.getResponseCode() != HttpURLConnection.HTTP_PARTIAL) {
System.out.println("Thread " + id + ": code = "
+ con.getResponseCode() + ", status = "
+ con.getResponseMessage());
status = ChildThread.STATUS_HTTPSTATUS_ERROR;
this.task.statusError = true;
outputStream.close();
con.disconnect();
System.out.println("Thread " + id + " finished.");
latch.countDown();
break;
}
inputStream = con.getInputStream();
int len = 0;
byte[] b = new byte[1024];
while ((len = inputStream.read(b)) != -1) {
outputStream.write(b, 0, len);
count += len;
//每讀滿5000個(gè)byte,往磁盤上flush一下
if(count % 5000 == 0){
⑦ outputStream.flush();
}
}
System.out.println("count is " + count);
if(count >= threadDownloadLength){
hasFinished = true;
}
⑧ outputStream.flush();
outputStream.close();
inputStream.close();
con.disconnect();
}
System.out.println("Thread " + id + " finished.");
latch.countDown();
break;
} catch (IOException e) {
try {
⑨ outputStream.flush();
⑩ TimeUnit.SECONDS.sleep(getSleepSeconds());
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (IOException e2) {
e2.printStackTrace();
}
continue;
}
}
}
}
在ChildThread的構(gòu)造方法中,除了設(shè)置一些從主線程中帶來的id, 起始位置之外,就是新建了一個(gè)臨時(shí)文件用來存放當(dāng)前線程的下載數(shù)據(jù)。臨時(shí)文件的命名規(guī)則是這樣的:下載的目標(biāo)文件名+”_”+線程編號。
現(xiàn)在讓我們來看看從網(wǎng)絡(luò)中讀數(shù)據(jù)是怎么讀的。我們通過URLConnection來獲得一個(gè)http的連接。有些網(wǎng)站為了安全起見,會(huì)對請求的http連接進(jìn)行過濾,因此為了偽裝這個(gè)http的連接請求,我們給httpHeader穿一件偽裝服。下面的setHeader方法展示了一些非常常用的典型的httpHeader的偽裝方法。比較重要的有:Uer-Agent模擬從Ubuntu的firefox瀏覽器發(fā)出的請求;Referer模擬瀏覽器請求的前一個(gè)觸發(fā)頁面,例如從skycn站點(diǎn)來下載軟件的話,Referer設(shè)置成skycn的首頁域名就可以了;Range就是這個(gè)連接獲取的流文件的起始區(qū)間。
private void setHeader(URLConnection con) {
con.setRequestProperty("User-Agent", "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.3) Gecko/2008092510 Ubuntu/8.04 (hardy) Firefox/3.0.3");
con.setRequestProperty("Accept-Language", "en-us,en;q=0.7,zh-cn;q=0.3");
con.setRequestProperty("Accept-Encoding", "aa");
con.setRequestProperty("Accept-Charset", "ISO-8859-1,utf-8;q=0.7,*;q=0.7");
con.setRequestProperty("Keep-Alive", "300");
con.setRequestProperty("Connection", "keep-alive");
con.setRequestProperty("If-Modified-Since", "Fri, 02 Jan 2009 17:00:05 GMT");
con.setRequestProperty("If-None-Match", "\"1261d8-4290-df64d224\"");
con.setRequestProperty("Cache-Control", "max-age=0");
con.setRequestProperty("Referer", "http://http://www.bt285.cn");
}
另外,為了避免線程因?yàn)榫W(wǎng)絡(luò)原因而阻塞,設(shè)置了ConnectTimeout和ReadTimeout,代碼⑤、⑥處。setConnectTimeout設(shè)置的連接的超時(shí)時(shí)間,而setReadTimeout設(shè)置的是讀取數(shù)據(jù)的超時(shí)時(shí)間,發(fā)生超時(shí)的話,就會(huì)拋出socketTimeout異常,兩個(gè)方法的參數(shù)都是超時(shí)的毫秒數(shù)。
這里對超時(shí)的發(fā)生,采用的是等候一段時(shí)間重新連接的方法。整個(gè)獲取網(wǎng)絡(luò)連接并讀取下載數(shù)據(jù)的過程都包含在一個(gè)循環(huán)之中(代碼③處),如果發(fā)生了連接或者讀取數(shù)據(jù)的超時(shí),在拋出的異常里面就會(huì)sleep一定的時(shí)間(代碼⑩處),然后continue,再次嘗試獲取連接并讀取數(shù)據(jù),這個(gè)時(shí)間可以通過setSleepSeconds()方法來設(shè)置。我們在迅雷等下載工具的使用中,經(jīng)??梢钥吹綘顟B(tài)欄會(huì)輸出類似“連接超時(shí),等待*秒后重試”的話,這個(gè)就是通過ConnectTimeout,ReadTimeout來實(shí)現(xiàn)的。
連接建立好之后,我們要檢查一下返回響應(yīng)的狀態(tài)碼。常見的Http Response Code有以下幾種:
a) 200 OK 一切正常,對GET和POST請求的應(yīng)答文檔跟在后面。
b) 206 Partial Content 客戶發(fā)送了一個(gè)帶有Range頭的GET請求,服務(wù)器完成。
c) 404 Not Found 無法找到指定位置的資源。這也是一個(gè)常用的應(yīng)答。
d) 414 Request URI Too Long URI太長。
e) 416 Requested Range Not Satisfiable 服務(wù)器不能滿足客戶在請求中指定的Range頭。
f) 500 Internal Server Error 服務(wù)器遇到了意料不到的情況,不能完成客戶的請求。
g) 503 Service Unavailable 服務(wù)器由于維護(hù)或者負(fù)載過重未能應(yīng)答。例如,Servlet可能在數(shù)據(jù)庫連接池已滿的情況下返回503。
在這些狀態(tài)里面,只有200與206才是我們需要的正確的狀態(tài)。所以在代碼⑥處,進(jìn)行了狀態(tài)碼的判斷,如果返回不符合要求的狀態(tài)碼,則結(jié)束線程,返回主線程并提示報(bào)錯(cuò)。
假設(shè)一切正常,下面我們就要考慮從網(wǎng)絡(luò)中讀數(shù)據(jù)了。正如我之前在分析mysql的數(shù)據(jù)庫驅(qū)動(dòng)中看的一樣,網(wǎng)絡(luò)中發(fā)送數(shù)據(jù)都是以數(shù)據(jù)包的形式來發(fā)送的,也就是說不管是客戶端向服務(wù)器發(fā)出的請求數(shù)據(jù),還是從服務(wù)器返回給客戶端的響應(yīng)數(shù)據(jù),都會(huì)被拆分成若干個(gè)小型數(shù)據(jù)包在網(wǎng)絡(luò)中傳遞,等數(shù)據(jù)包到達(dá)了目的地,網(wǎng)絡(luò)接口會(huì)依據(jù)數(shù)據(jù)包的編號來組裝它們,成為完整的比特?cái)?shù)據(jù)。因此,我們可以想到在這里也是一樣的,我們用inputStream的read方法來通過網(wǎng)卡從網(wǎng)絡(luò)中讀取數(shù)據(jù),并不一定一次就能把所有的數(shù)據(jù)包都讀完,所以我們要不斷的循環(huán)來從inputStream中讀取數(shù)據(jù)。Read方法有一個(gè)int型的返回值,表示每次從inputStream中讀取的字節(jié)數(shù),如果把這個(gè)inputStream中的數(shù)據(jù)讀完了,那么就返回-1。
Read方法最多可以有三個(gè)參數(shù),byte b[]是讀取數(shù)據(jù)之后存放的目標(biāo)數(shù)組,off標(biāo)識了目標(biāo)數(shù)組中存儲(chǔ)的開始位置,len是想要讀取的數(shù)據(jù)長度,這個(gè)長度必定不能大于b[]的長度。
public synchronized int read(byte b[], int off, int len);
我們的目標(biāo)是要把目標(biāo)地址的內(nèi)容下載下來,現(xiàn)在分了5個(gè)線程來分段下載,那么這些分段下載的數(shù)據(jù)保存在哪里呢?如果把它們都保存在內(nèi)存中是非常糟糕的做法,如果文件相當(dāng)之大,例如是一個(gè)視頻的話,難道把這么大的數(shù)據(jù)都放在內(nèi)存中嗎,這樣的話,萬一連接中斷,那前面下載的東西就都沒有了?我們當(dāng)然要想辦法及時(shí)的把下載的數(shù)據(jù)刷到磁盤上保存下來。當(dāng)用bt下載視頻的時(shí)候,通常都會(huì)有個(gè)臨時(shí)文件,當(dāng)視頻完全下載結(jié)束之后,這個(gè)臨時(shí)文件就會(huì)被刪除,那么下次繼續(xù)下載的時(shí)候,就會(huì)接著上次下載的點(diǎn)繼續(xù)下載。所以我們的outputStream就是往這個(gè)臨時(shí)文件來輸出了。
OutputStream的write方法和上面InputStream的read方法有類似的參數(shù),byte b[]是輸出數(shù)據(jù)的來源,off標(biāo)識了開始位置,len是數(shù)據(jù)長度。
public synchronized void write(byte b[], int off, int len) throws IOException;
在往臨時(shí)文件的outputStream中寫數(shù)據(jù)的時(shí)候,我會(huì)加上一個(gè)計(jì)數(shù)器,每滿5000個(gè)比特就往文件中flush一下(代碼⑦處)。
對于輸出流的flush,有些要注意的地方,在程序中有三個(gè)地方調(diào)用了outputStream.flush()。第一個(gè)是在循環(huán)的讀取網(wǎng)絡(luò)數(shù)據(jù)并往outputStream中寫入的時(shí)候,每滿5000個(gè)byte就flush一下(代碼⑦處);第二個(gè)是循環(huán)之后(代碼⑧處),這時(shí)候正常的讀取寫入操作已經(jīng)完成,但是outputStream中還有沒有刷入磁盤的數(shù)據(jù),所以要flush一下才能關(guān)閉連接;第三個(gè)就是在異常中的flush(代碼⑨處),因?yàn)槿绻l(fā)生了連接超時(shí)或者讀取數(shù)據(jù)超時(shí)的話,就會(huì)直接跑到catch的exception中去,這個(gè)時(shí)候outputStream中的數(shù)據(jù)如果不flush的話,重新連接的時(shí)候這部分?jǐn)?shù)據(jù)就會(huì)丟失了。另外,當(dāng)拋出異常,重新連接的時(shí)候,下載的起始位置也要重新設(shè)置(代碼④處),count就是用來標(biāo)識已經(jīng)下載的字節(jié)數(shù)的,把count+startPosition就是新一次連接需要的下載起始位置了。
3、現(xiàn)在每個(gè)分段的下載線程都順利結(jié)束了,也都創(chuàng)建了相應(yīng)的臨時(shí)文件,接下來在主線程中會(huì)對臨時(shí)文件進(jìn)行合并,并寫入目標(biāo)文件,最后刪除臨時(shí)文件。這部分很簡單,就是一個(gè)對所有下載線程進(jìn)行遍歷的過程。這里outputStream也有兩次flush,與上面類似,不再贅述。
/**author by http://www.bt285.cn http://www.guihua.org */
private void tempFileToTargetFile(ChildThread[] childThreads) {
try {
BufferedOutputStream outputStream = new BufferedOutputStream(
new FileOutputStream(fileDir + fileName));
// 遍歷所有子線程創(chuàng)建的臨時(shí)文件,按順序把下載內(nèi)容寫入目標(biāo)文件中
for (int i = 0; i < threadNum; i++) {
if (statusError) {
for (int k = 0; k < threadNum; k++) {
if (childThreads[k].tempFile.length() == 0)
childThreads[k].tempFile.delete();
}
System.out.println("本次下載任務(wù)不成功,請重新設(shè)置線程數(shù)。");
break;
}
BufferedInputStream inputStream = new BufferedInputStream(
new FileInputStream(childThreads[i].tempFile));
System.out.println("Now is file " + childThreads[i].id);
int len = 0;
int count = 0;
byte[] b = new byte[1024];
while ((len = inputStream.read(b)) != -1) {
count += len;
outputStream.write(b, 0, len);
if ((count % 5000) == 0) {
outputStream.flush();
}
// b = new byte[1024];
}
inputStream.close();
// 刪除臨時(shí)文件
if (childThreads[i].status == ChildThread.STATUS_HAS_FINISHED) {
childThreads[i].tempFile.delete();
}
}
outputStream.flush();
outputStream.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
4、最后,說說斷點(diǎn)續(xù)傳,前面為了實(shí)現(xiàn)斷點(diǎn)續(xù)傳,在每個(gè)下載線程中都創(chuàng)建了一個(gè)臨時(shí)文件,現(xiàn)在我們就要利用這個(gè)臨時(shí)文件來設(shè)置斷點(diǎn)的位置。由于臨時(shí)文件的命名方式都是固定的,所以我們就專門找對應(yīng)下載的目標(biāo)文件的臨時(shí)文件,臨時(shí)文件中已經(jīng)下載的字節(jié)數(shù)就是我們需要的斷點(diǎn)位置。startPos是一個(gè)數(shù)組,存放了每個(gè)線程的已下載的字節(jié)數(shù)。
//第一步,分析已下載的臨時(shí)文件,設(shè)置斷點(diǎn),如果是新的下載任務(wù),則建立目標(biāo)文件。

private long[] setThreadBreakpoint(String fileDir2, String fileName2,
long contentLength, long[] startPos) {
File file = new File(fileDir + fileName);
long localFileSize = file.length();
if (file.exists()) {
System.out.println("file " + fileName + " has exists!");
// 下載的目標(biāo)文件已存在,判斷目標(biāo)文件是否完整
if (localFileSize < contentLength) {
System.out.println("Now download continue
");
// 遍歷目標(biāo)文件的所有臨時(shí)文件,設(shè)置斷點(diǎn)的位置,即每個(gè)臨時(shí)文件的長度
File tempFileDir = new File(fileDir);
File[] files = tempFileDir.listFiles();
for (int k = 0; k < files.length; k++) {
String tempFileName = files[k].getName();
// 臨時(shí)文件的命名方式為:目標(biāo)文件名+"_"+編號
if (tempFileName != null && files[k].length() > 0
&& tempFileName.startsWith(fileName + "_")) {
int fileLongNum = Integer.parseInt(tempFileName
.substring(tempFileName.lastIndexOf("_") + 1,
tempFileName.lastIndexOf("_") + 2));
// 為每個(gè)線程設(shè)置已下載的位置
startPos[fileLongNum] = files[k].length();
}
}
}
} else {
// 如果下載的目標(biāo)文件不存在,則創(chuàng)建新文件
try {
file.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}
return startPos;
}
5、測試
public class DownloadStartup {
private static final String encoding = "utf-8";
public static void main(String[] args) {
DownloadTask downloadManager = new DownloadTask();
String urlStr = "http://apache.freelamp.com/velocity/tools/1.4/velocity-tools-1.4.zip";
downloadManager.setSleepSeconds(5);
downloadManager.download(urlStr, encoding);
}
}
* author:annegu
* date:2009-07-16
*/
annegu做了一個(gè)簡單的Http多線程的下載程序,來討論一下多線程并發(fā)下載以及斷點(diǎn)續(xù)傳的問題。
這個(gè)程序的功能,就是可以分多個(gè)線程從目標(biāo)地址上下載數(shù)據(jù),每個(gè)線程負(fù)責(zé)下載一部分,并可以支持?jǐn)帱c(diǎn)續(xù)傳和超時(shí)重連。
下載的方法是download(),它接收兩個(gè)參數(shù),分別是要下載的頁面的url和編碼方式。在這個(gè)負(fù)責(zé)下載的方法中,主要分了三個(gè)步驟。第一步是用來設(shè)置斷點(diǎn)續(xù)傳時(shí)候的一些信息的,第二步就是主要的分多線程來下載了,最后是數(shù)據(jù)的合并。
1、多線程下載:
























































首先從url中提取目標(biāo)文件的名稱,并在對應(yīng)的目錄創(chuàng)建文件。然后取得要下載的文件大小,根據(jù)分成的下載線程數(shù)量平均分配每個(gè)線程需要下載的數(shù)據(jù)量,就是threadLength。然后就可以分多個(gè)線程來進(jìn)行下載任務(wù)了。
在這個(gè)例子中,并沒有直接顯示的創(chuàng)建Thread對象,而是用Executor來管理Thread對象,并且用CachedThreadPool來創(chuàng)建的線程池,當(dāng)然也可以用FixedThreadPool。CachedThreadPool在程序執(zhí)行的過程中會(huì)創(chuàng)建與所需數(shù)量相同的線程,當(dāng)程序回收舊線程的時(shí)候就停止創(chuàng)建新線程。FixedThreadPool可以預(yù)先新建參數(shù)給定個(gè)數(shù)的線程,這樣就不用在創(chuàng)建任務(wù)的時(shí)候再來創(chuàng)建線程了,可以直接從線程池中取出已準(zhǔn)備好的線程。下載線程的數(shù)量是通過一個(gè)全局變量threadNum來控制的,默認(rèn)為5。
好了,這5個(gè)子線程已經(jīng)通過Executor來創(chuàng)建了,下面它們就會(huì)各自為政,互不干涉的執(zhí)行了。線程有兩種實(shí)現(xiàn)方式:實(shí)現(xiàn)Runnable接口;繼承Thread類。
ChildThread就是子線程,它作為DownloadTask的內(nèi)部類,繼承了Thread,它的構(gòu)造方法需要5個(gè)參數(shù),依次是一個(gè)對DownloadTask的引用,一個(gè)CountDownLatch,id(標(biāo)識線程的id號),startPosition(下載內(nèi)容的開始位置),endPosition(下載內(nèi)容的結(jié)束位置)。
這個(gè)CountDownLatch是做什么用的呢?
現(xiàn)在我們整理一下思路,要實(shí)現(xiàn)分多個(gè)線程來下載數(shù)據(jù)的話,我們肯定還要把這多個(gè)線程下載下來的數(shù)據(jù)進(jìn)行合。主線程必須等待所有的子線程都執(zhí)行結(jié)束之后,才能把所有子線程的下載數(shù)據(jù)按照各自的id順序進(jìn)行合并。CountDownLatch就是來做這個(gè)工作的。
CountDownLatch用來同步主線程,強(qiáng)制主線程等待所有的子線程執(zhí)行的下載操作完成。在主線程中,CountDownLatch對象被設(shè)置了一個(gè)初始計(jì)數(shù)器,就是子線程的個(gè)數(shù)5個(gè),代碼①處。在新建了5個(gè)子線程并開始執(zhí)行之后,主線程用CountDownLatch的await()方法來阻塞主線程,直到這個(gè)計(jì)數(shù)器的值到達(dá)0,才會(huì)進(jìn)行下面的操作,代碼②處。
對每個(gè)子線程來說,在執(zhí)行完下載指定區(qū)間與長度的數(shù)據(jù)之后,必須通過調(diào)用CountDownLatch的countDown()方法來把這個(gè)計(jì)數(shù)器減1。
2、在全面開啟下載任務(wù)之后,主線程就開始阻塞,等待子線程執(zhí)行完畢,所以下面我們來看一下具體的下載線程ChildThread。































































































































現(xiàn)在讓我們來看看從網(wǎng)絡(luò)中讀數(shù)據(jù)是怎么讀的。我們通過URLConnection來獲得一個(gè)http的連接。有些網(wǎng)站為了安全起見,會(huì)對請求的http連接進(jìn)行過濾,因此為了偽裝這個(gè)http的連接請求,我們給httpHeader穿一件偽裝服。下面的setHeader方法展示了一些非常常用的典型的httpHeader的偽裝方法。比較重要的有:Uer-Agent模擬從Ubuntu的firefox瀏覽器發(fā)出的請求;Referer模擬瀏覽器請求的前一個(gè)觸發(fā)頁面,例如從skycn站點(diǎn)來下載軟件的話,Referer設(shè)置成skycn的首頁域名就可以了;Range就是這個(gè)連接獲取的流文件的起始區(qū)間。












這里對超時(shí)的發(fā)生,采用的是等候一段時(shí)間重新連接的方法。整個(gè)獲取網(wǎng)絡(luò)連接并讀取下載數(shù)據(jù)的過程都包含在一個(gè)循環(huán)之中(代碼③處),如果發(fā)生了連接或者讀取數(shù)據(jù)的超時(shí),在拋出的異常里面就會(huì)sleep一定的時(shí)間(代碼⑩處),然后continue,再次嘗試獲取連接并讀取數(shù)據(jù),這個(gè)時(shí)間可以通過setSleepSeconds()方法來設(shè)置。我們在迅雷等下載工具的使用中,經(jīng)??梢钥吹綘顟B(tài)欄會(huì)輸出類似“連接超時(shí),等待*秒后重試”的話,這個(gè)就是通過ConnectTimeout,ReadTimeout來實(shí)現(xiàn)的。
連接建立好之后,我們要檢查一下返回響應(yīng)的狀態(tài)碼。常見的Http Response Code有以下幾種:
a) 200 OK 一切正常,對GET和POST請求的應(yīng)答文檔跟在后面。
b) 206 Partial Content 客戶發(fā)送了一個(gè)帶有Range頭的GET請求,服務(wù)器完成。
c) 404 Not Found 無法找到指定位置的資源。這也是一個(gè)常用的應(yīng)答。
d) 414 Request URI Too Long URI太長。
e) 416 Requested Range Not Satisfiable 服務(wù)器不能滿足客戶在請求中指定的Range頭。
f) 500 Internal Server Error 服務(wù)器遇到了意料不到的情況,不能完成客戶的請求。
g) 503 Service Unavailable 服務(wù)器由于維護(hù)或者負(fù)載過重未能應(yīng)答。例如,Servlet可能在數(shù)據(jù)庫連接池已滿的情況下返回503。
在這些狀態(tài)里面,只有200與206才是我們需要的正確的狀態(tài)。所以在代碼⑥處,進(jìn)行了狀態(tài)碼的判斷,如果返回不符合要求的狀態(tài)碼,則結(jié)束線程,返回主線程并提示報(bào)錯(cuò)。
假設(shè)一切正常,下面我們就要考慮從網(wǎng)絡(luò)中讀數(shù)據(jù)了。正如我之前在分析mysql的數(shù)據(jù)庫驅(qū)動(dòng)中看的一樣,網(wǎng)絡(luò)中發(fā)送數(shù)據(jù)都是以數(shù)據(jù)包的形式來發(fā)送的,也就是說不管是客戶端向服務(wù)器發(fā)出的請求數(shù)據(jù),還是從服務(wù)器返回給客戶端的響應(yīng)數(shù)據(jù),都會(huì)被拆分成若干個(gè)小型數(shù)據(jù)包在網(wǎng)絡(luò)中傳遞,等數(shù)據(jù)包到達(dá)了目的地,網(wǎng)絡(luò)接口會(huì)依據(jù)數(shù)據(jù)包的編號來組裝它們,成為完整的比特?cái)?shù)據(jù)。因此,我們可以想到在這里也是一樣的,我們用inputStream的read方法來通過網(wǎng)卡從網(wǎng)絡(luò)中讀取數(shù)據(jù),并不一定一次就能把所有的數(shù)據(jù)包都讀完,所以我們要不斷的循環(huán)來從inputStream中讀取數(shù)據(jù)。Read方法有一個(gè)int型的返回值,表示每次從inputStream中讀取的字節(jié)數(shù),如果把這個(gè)inputStream中的數(shù)據(jù)讀完了,那么就返回-1。
Read方法最多可以有三個(gè)參數(shù),byte b[]是讀取數(shù)據(jù)之后存放的目標(biāo)數(shù)組,off標(biāo)識了目標(biāo)數(shù)組中存儲(chǔ)的開始位置,len是想要讀取的數(shù)據(jù)長度,這個(gè)長度必定不能大于b[]的長度。
public synchronized int read(byte b[], int off, int len);
我們的目標(biāo)是要把目標(biāo)地址的內(nèi)容下載下來,現(xiàn)在分了5個(gè)線程來分段下載,那么這些分段下載的數(shù)據(jù)保存在哪里呢?如果把它們都保存在內(nèi)存中是非常糟糕的做法,如果文件相當(dāng)之大,例如是一個(gè)視頻的話,難道把這么大的數(shù)據(jù)都放在內(nèi)存中嗎,這樣的話,萬一連接中斷,那前面下載的東西就都沒有了?我們當(dāng)然要想辦法及時(shí)的把下載的數(shù)據(jù)刷到磁盤上保存下來。當(dāng)用bt下載視頻的時(shí)候,通常都會(huì)有個(gè)臨時(shí)文件,當(dāng)視頻完全下載結(jié)束之后,這個(gè)臨時(shí)文件就會(huì)被刪除,那么下次繼續(xù)下載的時(shí)候,就會(huì)接著上次下載的點(diǎn)繼續(xù)下載。所以我們的outputStream就是往這個(gè)臨時(shí)文件來輸出了。
OutputStream的write方法和上面InputStream的read方法有類似的參數(shù),byte b[]是輸出數(shù)據(jù)的來源,off標(biāo)識了開始位置,len是數(shù)據(jù)長度。
public synchronized void write(byte b[], int off, int len) throws IOException;
在往臨時(shí)文件的outputStream中寫數(shù)據(jù)的時(shí)候,我會(huì)加上一個(gè)計(jì)數(shù)器,每滿5000個(gè)比特就往文件中flush一下(代碼⑦處)。
對于輸出流的flush,有些要注意的地方,在程序中有三個(gè)地方調(diào)用了outputStream.flush()。第一個(gè)是在循環(huán)的讀取網(wǎng)絡(luò)數(shù)據(jù)并往outputStream中寫入的時(shí)候,每滿5000個(gè)byte就flush一下(代碼⑦處);第二個(gè)是循環(huán)之后(代碼⑧處),這時(shí)候正常的讀取寫入操作已經(jīng)完成,但是outputStream中還有沒有刷入磁盤的數(shù)據(jù),所以要flush一下才能關(guān)閉連接;第三個(gè)就是在異常中的flush(代碼⑨處),因?yàn)槿绻l(fā)生了連接超時(shí)或者讀取數(shù)據(jù)超時(shí)的話,就會(huì)直接跑到catch的exception中去,這個(gè)時(shí)候outputStream中的數(shù)據(jù)如果不flush的話,重新連接的時(shí)候這部分?jǐn)?shù)據(jù)就會(huì)丟失了。另外,當(dāng)拋出異常,重新連接的時(shí)候,下載的起始位置也要重新設(shè)置(代碼④處),count就是用來標(biāo)識已經(jīng)下載的字節(jié)數(shù)的,把count+startPosition就是新一次連接需要的下載起始位置了。
3、現(xiàn)在每個(gè)分段的下載線程都順利結(jié)束了,也都創(chuàng)建了相應(yīng)的臨時(shí)文件,接下來在主線程中會(huì)對臨時(shí)文件進(jìn)行合并,并寫入目標(biāo)文件,最后刪除臨時(shí)文件。這部分很簡單,就是一個(gè)對所有下載線程進(jìn)行遍歷的過程。這里outputStream也有兩次flush,與上面類似,不再贅述。



































































































