鍍金池/ 教程/ Java/ 剖析同步器
Slipped Conditions
阻塞隊列
無阻塞算法
嵌套管程鎖死
Java 并發(fā)性和多線程介紹
死鎖
線程安全及不可變性
并發(fā)編程模型
Java 中的讀/寫鎖
剖析同步器
競態(tài)條件與臨界區(qū)
多線程的優(yōu)點
CAS
線程通信
如何創(chuàng)建并運行 java 線程
阿姆達爾定律
避免死鎖
信號量
多線程的代價
饑餓和公平
線程池
重入鎖死
Java 中的鎖
Java 內(nèi)存模型
線程安全與共享資源
Java 同步塊

剖析同步器

雖然許多同步器(如鎖,信號量,阻塞隊列等)功能上各不相同,但它們的內(nèi)部設(shè)計上卻差別不大。換句話說,它們內(nèi)部的的基礎(chǔ)部分是相同(或相似)的。了解這些基礎(chǔ)部件能在設(shè)計同步器的時候給我們大大的幫助。這就是本文要細(xì)說的內(nèi)容。

注:本文的內(nèi)容是哥本哈根信息技術(shù)大學(xué)一個由 Jakob Jenkov,Toke Johansen 和 Lars Bj?rn 參與的 M.Sc.學(xué)生項目的部分成果。在此項目期間我們咨詢 Doug Lea 是否知道類似的研究。有趣的是在開發(fā) Java 5 并發(fā)工具包期間他已經(jīng)提出了類似的結(jié)論。Doug Lea 的研究,我相信,在《Java Concurrency in Practice》一書中有描述。這本書有一章“剖析同步器”就類似于本文,但不盡相同。

大部分同步器都是用來保護某個區(qū)域(臨界區(qū))的代碼,這些代碼可能會被多線程并發(fā)訪問。要實現(xiàn)這個目標(biāo),同步器一般要支持下列功能:

  1. 狀態(tài)
  2. 訪問條件
  3. 狀態(tài)變化
  4. 通知策略
  5. Test-and-Set 方法
  6. Set 方法

并不是所有同步器都包含上述部分,也有些并不完全遵照上面的內(nèi)容。但通常你能從中發(fā)現(xiàn)這些部分的一或多個。

狀態(tài)

同步器中的狀態(tài)是用來確定某個線程是否有訪問權(quán)限。在 Lock 中,狀態(tài)是 boolean 類型的,表示當(dāng)前 Lock 對象是否處于鎖定狀態(tài)。在 BoundedSemaphore 中,內(nèi)部狀態(tài)包含一個計數(shù)器(int 類型)和一個上限(int 類型),分別表示當(dāng)前已經(jīng)獲取的許可數(shù)和最大可獲取的許可數(shù)。BlockingQueue 的狀態(tài)是該隊列中元素列表以及隊列的最大容量。

下面是 Lock 和 BoundedSemaphore 中的兩個代碼片段。

public class Lock{
  //state is kept here
  private boolean isLocked = false; 
  public synchronized void lock()
  throws InterruptedException{
    while(isLocked){
      wait();
    }
    isLocked = true;
  }
  ...
}
public class BoundedSemaphore {
  //state is kept here
  private int signals = 0;
  private int bound   = 0;

  public BoundedSemaphore(int upperBound){
    this.bound = upperBound;
  }
  public synchronized void take() throws InterruptedException{
    while(this.signals == bound) wait();
    this.signal++;
    this.notify();
  }
  ...
}

訪問條件

訪問條件決定調(diào)用 test-and-set-state 方法的線程是否可以對狀態(tài)進行設(shè)置。訪問條件一般是基于同步器狀態(tài)的。通常是放在一個 while 循環(huán)里,以避免虛假喚醒問題。訪問條件的計算結(jié)果要么是 true 要么是 false。

Lock 中的訪問條件只是簡單地檢查 isLocked 的值。根據(jù)執(zhí)行的動作是“獲取”還是“釋放”,BoundedSemaphore 實際上有兩個訪問條件。如果某個線程想“獲取”許可,將檢查 signals 變量是否達到上限;如果某個線程想“釋放”許可,將檢查 signals 變量是否為 0。

這里有兩個來自 Lock 和 BoundedSemaphore 的代碼片段,它們都有訪問條件。注意觀察條件是怎樣在 while 循環(huán)中檢查的。

public class Lock{
  private boolean isLocked = false;
  public synchronized void lock()
  throws InterruptedException{
    //access condition
    while(isLocked){
      wait();
    }
    isLocked = true;
  }
  ...
}
public class BoundedSemaphore {
  private int signals = 0;
  private int bound = 0;

  public BoundedSemaphore(int upperBound){
    this.bound = upperBound;
  }
  public synchronized void take() throws InterruptedException{
    //access condition
    while(this.signals == bound) wait();
    this.signals++;
    this.notify();
  }
  public synchronized void release() throws InterruptedException{
    //access condition
    while(this.signals == 0) wait();
    this.signals--;
    this.notify();
  }
}

狀態(tài)變化

一旦一個線程獲得了臨界區(qū)的訪問權(quán)限,它得改變同步器的狀態(tài),讓其它線程阻塞,防止它們進入臨界區(qū)。換而言之,這個狀態(tài)表示正有一個線程在執(zhí)行臨界區(qū)的代碼。其它線程想要訪問臨界區(qū)的時候,該狀態(tài)應(yīng)該影響到訪問條件的結(jié)果。

在 Lock 中,通過代碼設(shè)置 isLocked = true 來改變狀態(tài),在信號量中,改變狀態(tài)的是 signals–或 signals++;

這里有兩個狀態(tài)變化的代碼片段:

public class Lock{

  private boolean isLocked = false;

  public synchronized void lock()
  throws InterruptedException{
    while(isLocked){
      wait();
    }
    //state change
    isLocked = true;
  }

  public synchronized void unlock(){
    //state change
    isLocked = false;
    notify();
  }
}
public class BoundedSemaphore {
  private int signals = 0;
  private int bound   = 0;

  public BoundedSemaphore(int upperBound){
    this.bound = upperBound;
  }

  public synchronized void take() throws InterruptedException{
    while(this.signals == bound) wait();
    //state change
    this.signals++;
    this.notify();
  }

  public synchronized void release() throws InterruptedException{
    while(this.signals == 0) wait();
    //state change
    this.signals--;
    this.notify();
  }
}

通知策略

一旦某個線程改變了同步器的狀態(tài),可能需要通知其它等待的線程狀態(tài)已經(jīng)變了。因為也許這個狀態(tài)的變化會讓其它線程的訪問條件變?yōu)?true。

通知策略通常分為三種:

  1. 通知所有等待的線程
  2. 通知 N 個等待線程中的任意一個
  3. 通知 N 個等待線程中的某個指定的線程

通知所有等待的線程非常簡單。所有等待的線程都調(diào)用的同一個對象上的 wait()方法,某個線程想要通知它們只需在這個對象上調(diào)用 notifyAll()方法。

通知等待線程中的任意一個也很簡單,只需將 notifyAll()調(diào)用換成 notify()即可。調(diào)用 notify 方法沒辦法確定喚醒的是哪一個線程,也就是“等待線程中的任意一個”。

有時候可能需要通知指定的線程而非任意一個等待的線程。例如,如果你想保證線程被通知的順序與它們進入同步塊的順序一致,或按某種優(yōu)先級的順序來通知。想要實現(xiàn)這種需求,每個等待的線程必須在其自有的對象上調(diào)用 wait()。當(dāng)通知線程想要通知某個特定的等待線程時,調(diào)用該線程自有對象的 notify()方法即可。饑餓和公平中有這樣的例子。

下面是通知策略的一個例子(通知任意一個等待線程):

public class Lock{

  private boolean isLocked = false;

  public synchronized void lock()
  throws InterruptedException{
    while(isLocked){
      //wait strategy - related to notification strategy
      wait();
    }
    isLocked = true;
  }

  public synchronized void unlock(){
    isLocked = false;
    notify(); //notification strategy
  }
}

Test-and-Set 方法

同步器中最常見的有兩種類型的方法,test-and-set 是第一種(set 是另一種)。Test-and-set 的意思是,調(diào)用這個方法的線程檢查訪問條件,如若滿足,該線程設(shè)置同步器的內(nèi)部狀態(tài)來表示它已經(jīng)獲得了訪問權(quán)限。

狀態(tài)的改變通常使其它試圖獲取訪問權(quán)限的線程計算條件狀態(tài)時得到 false 的結(jié)果,但并不一定總是如此。例如,在讀寫鎖中,獲取讀鎖的線程會更新讀寫鎖的狀態(tài)來表示它獲取到了讀鎖,但是,只要沒有線程請求寫鎖,其它請求讀鎖的線程也能成功。

test-and-set 很有必要是原子的,也就是說在某個線程檢查和設(shè)置狀態(tài)期間,不允許有其它線程在 test-and-set 方法中執(zhí)行。

test-and-set 方法的程序流通常遵照下面的順序:

  1. 如有必要,在檢查前先設(shè)置狀態(tài)
  2. 檢查訪問條件
  3. 如果訪問條件不滿足,則等待
  4. 如果訪問條件滿足,設(shè)置狀態(tài),如有必要還要通知等待線程

下面的 ReadWriteLock 類的 lockWrite()方法展示了 test-and-set 方法。調(diào)用 lockWrite()的線程在檢查之前先設(shè)置狀態(tài)(writeRequests++)。然后檢查 canGrantWriteAccess()中的訪問條件,如果檢查通過,在退出方法之前再次設(shè)置內(nèi)部狀態(tài)。這個方法中沒有去通知等待線程。

public class ReadWriteLock{
    private Map<Thread, Integer> readingThreads =
        new HashMap<Thread, Integer>();

    private int writeAccesses    = 0;
    private int writeRequests    = 0;
    private Thread writingThread = null;

    ...

    public synchronized void lockWrite() throws InterruptedException{
      writeRequests++;
      Thread callingThread = Thread.currentThread();
      while(! canGrantWriteAccess(callingThread)){
        wait();
      }
      writeRequests--;
      writeAccesses++;
      writingThread = callingThread;
    } 

    ...
}

下面的 BoundedSemaphore 類有兩個 test-and-set 方法:take()和 release()。兩個方法都有檢查和設(shè)置內(nèi)部狀態(tài)。

public class BoundedSemaphore {
  private int signals = 0;
  private int bound   = 0;

  public BoundedSemaphore(int upperBound){
    this.bound = upperBound;
  }

  public synchronized void take() throws InterruptedException{
    while(this.signals == bound) wait();
    this.signals++;
    this.notify();
  }

  public synchronized void release() throws InterruptedException{
    while(this.signals == 0) wait();
    this.signals--;
    this.notify();
  }
}

set 方法

set 方法是同步器中常見的第二種方法。set 方法僅是設(shè)置同步器的內(nèi)部狀態(tài),而不先做檢查。set 方法的一個典型例子是 Lock 類中的 unlock()方法。持有鎖的某個線程總是能夠成功解鎖,而不需要檢查該鎖是否處于解鎖狀態(tài)。

set 方法的程序流通常如下:

  1. 設(shè)置內(nèi)部狀態(tài)
  2. 通知等待的線程

這里是 unlock()方法的一個例子:

public class Lock{
  private boolean isLocked = false;

  public synchronized void unlock(){
    isLocked = false;
    notify();
  }
}