鍍金池/ 教程/ Java/ 線程池
Slipped Conditions
阻塞隊列
無阻塞算法
嵌套管程鎖死
Java 并發(fā)性和多線程介紹
死鎖
線程安全及不可變性
并發(fā)編程模型
Java 中的讀/寫鎖
剖析同步器
競態(tài)條件與臨界區(qū)
多線程的優(yōu)點(diǎn)
CAS
線程通信
如何創(chuàng)建并運(yùn)行 java 線程
阿姆達(dá)爾定律
避免死鎖
信號量
多線程的代價
饑餓和公平
線程池
重入鎖死
Java 中的鎖
Java 內(nèi)存模型
線程安全與共享資源
Java 同步塊

線程池

線程池(Thread Pool)對于限制應(yīng)用程序中同一時刻運(yùn)行的線程數(shù)很有用。因為每啟動一個新線程都會有相應(yīng)的性能開銷,每個線程都需要給棧分配一些內(nèi)存等等。

我們可以把并發(fā)執(zhí)行的任務(wù)傳遞給一個線程池,來替代為每個并發(fā)執(zhí)行的任務(wù)都啟動一個新的線程。只要池里有空閑的線程,任務(wù)就會分配給一個線程執(zhí)行。在線程池的內(nèi)部,任務(wù)被插入一個阻塞隊列(Blocking Queue),線程池里的線程會去取這個隊列里的任務(wù)。當(dāng)一個新任務(wù)插入隊列時,一個空閑線程就會成功的從隊列中取出任務(wù)并且執(zhí)行它。

線程池經(jīng)常應(yīng)用在多線程服務(wù)器上。每個通過網(wǎng)絡(luò)到達(dá)服務(wù)器的連接都被包裝成一個任務(wù)并且傳遞給線程池。線程池的線程會并發(fā)的處理連接上的請求。以后會再深入有關(guān) Java 實現(xiàn)多線程服務(wù)器的細(xì)節(jié)。

Java 5 在 java.util.concurrent 包中自帶了內(nèi)置的線程池,所以你不用非得實現(xiàn)自己的線程池。你可以閱讀我寫的 java.util.concurrent.ExecutorService 的文章以了解更多有關(guān)內(nèi)置線程池的知識。不過無論如何,知道一點(diǎn)關(guān)于線程池實現(xiàn)的知識總是有用的。

這里有一個簡單的線程池實現(xiàn):

public class ThreadPool {

  private BlockingQueue taskQueue = null;
  private List<PoolThread> threads = new ArrayList<PoolThread>();
  private boolean isStopped = false;

  public ThreadPool(int noOfThreads, int maxNoOfTasks) {
    taskQueue = new BlockingQueue(maxNoOfTasks);

    for (int i=0; i<noOfThreads; i++) {
      threads.add(new PoolThread(taskQueue));
    }
    for (PoolThread thread : threads) {
      thread.start();
    }
  }

  public void synchronized execute(Runnable task) {
    if(this.isStopped) throw
      new IllegalStateException("ThreadPool is stopped");

    this.taskQueue.enqueue(task);
  }

  public synchronized boolean stop() {
    this.isStopped = true;
    for (PoolThread thread : threads) {
      thread.stop();
    }
  }

}

(校注:原文有編譯錯誤,我修改了下)

public class PoolThread extends Thread {

  private BlockingQueue<Runnable> taskQueue = null;
  private boolean       isStopped = false;

  public PoolThread(BlockingQueue<Runnable> queue) {
    taskQueue = queue;
  }

  public void run() {
    while (!isStopped()) {
      try {
        Runnable runnable =taskQueue.take();
        runnable.run();
      } catch(Exception e) {
        // 寫日志或者報告異常,
        // 但保持線程池運(yùn)行.
      }
    }
  }

  public synchronized void toStop() {
    isStopped = true;
    this.interrupt(); // 打斷池中線程的 dequeue() 調(diào)用.
  }

  public synchronized boolean isStopped() {
    return isStopped;
  }
}

線程池的實現(xiàn)由兩部分組成。類 ThreadPool 是線程池的公開接口,而類 PoolThread 用來實現(xiàn)執(zhí)行任務(wù)的子線程。

為了執(zhí)行一個任務(wù),方法 ThreadPool.execute(Runnable r)用 Runnable 的實現(xiàn)作為調(diào)用參數(shù)。在內(nèi)部,Runnable 對象被放入阻塞隊列 (Blocking Queue) ,等待著被子線程取出隊列。

一個空閑的 PoolThread 線程會把 Runnable 對象從隊列中取出并執(zhí)行。你可以在 PoolThread.run()方法里看到這些代碼。執(zhí)行完畢后,PoolThread 進(jìn)入循環(huán)并且嘗試從隊列中再取出一個任務(wù),直到線程終止。

調(diào)用 ThreadPool.stop()方法可以停止 ThreadPool。在內(nèi)部,調(diào)用 stop 先會標(biāo)記 isStopped 成員變量(為 true)。然后,線程池的每一個子線程都調(diào)用 PoolThread.stop()方法停止運(yùn)行。注意,如果線程池的 execute()在 stop()之后調(diào)用,execute()方法會拋出 IllegalStateException 異常。

子線程會在完成當(dāng)前執(zhí)行的任務(wù)后停止。注意 PoolThread.stop() 方法中調(diào)用了 this.interrupt()。它確保阻塞在 taskQueue.dequeue() 里的 wait()調(diào)用的線程能夠跳出 wait()調(diào)用(校對注:因為執(zhí)行了中斷 interrupt,它能夠打斷這個調(diào)用),并且拋出一個 InterruptedException 異常離開 dequeue()方法。這個異常在 PoolThread.run()方法中被截獲、報告,然后再檢查 isStopped 變量。由于 isStopped 的值是 true, 因此 PoolThread.run()方法退出,子線程終止。

(校對注:看完覺得不過癮?更詳細(xì)的線程池文章參見 Java 線程池的分析和使用)