鍍金池/ 教程/ Java/ Java 中的讀/寫鎖
Slipped Conditions
阻塞隊列
無阻塞算法
嵌套管程鎖死
Java 并發(fā)性和多線程介紹
死鎖
線程安全及不可變性
并發(fā)編程模型
Java 中的讀/寫鎖
剖析同步器
競態(tài)條件與臨界區(qū)
多線程的優(yōu)點
CAS
線程通信
如何創(chuàng)建并運(yùn)行 java 線程
阿姆達(dá)爾定律
避免死鎖
信號量
多線程的代價
饑餓和公平
線程池
重入鎖死
Java 中的鎖
Java 內(nèi)存模型
線程安全與共享資源
Java 同步塊

Java 中的讀/寫鎖

相比 Java 中的鎖(Locks in Java)里 Lock 實現(xiàn),讀寫鎖更復(fù)雜一些。假設(shè)你的程序中涉及到對一些共享資源的讀和寫操作,且寫操作沒有讀操作那么頻繁。在沒有寫操作的時候,兩個線程同時讀一個資源沒有任何問題,所以應(yīng)該允許多個線程能在同時讀取共享資源。但是如果有一個線程想去寫這些共享資源,就不應(yīng)該再有其它線程對該資源進(jìn)行讀或?qū)懀?em>譯者注:也就是說:讀-讀能共存,讀-寫不能共存,寫-寫不能共存)。這就需要一個讀/寫鎖來解決這個問題。

Java5 在 java.util.concurrent 包中已經(jīng)包含了讀寫鎖。盡管如此,我們還是應(yīng)該了解其實現(xiàn)背后的原理。

以下是本文的主題

  1. 讀/寫鎖的 Java 實現(xiàn)(Read / Write Lock Java Implementation)
  2. 讀/寫鎖的重入(Read / Write Lock Reentrance)
  3. 讀鎖重入(Read Reentrance)
  4. 寫鎖重入(Write Reentrance)
  5. 讀鎖升級到寫鎖(Read to Write Reentrance)
  6. 寫鎖降級到讀鎖(Write to Read Reentrance)
  7. 可重入的 ReadWriteLock 的完整實現(xiàn)(Fully Reentrant ReadWriteLock)
  8. 在 finally 中調(diào)用 unlock() (Calling unlock() from a finally-clause)

讀/寫鎖的 Java 實現(xiàn)

先讓我們對讀寫訪問資源的條件做個概述:

讀取 沒有線程正在做寫操作,且沒有線程在請求寫操作。

寫入 沒有線程正在做讀寫操作。

如果某個線程想要讀取資源,只要沒有線程正在對該資源進(jìn)行寫操作且沒有線程請求對該資源的寫操作即可。我們假設(shè)對寫操作的請求比對讀操作的請求更重要,就要提升寫請求的優(yōu)先級。此外,如果讀操作發(fā)生的比較頻繁,我們又沒有提升寫操作的優(yōu)先級,那么就會產(chǎn)生“饑餓”現(xiàn)象。請求寫操作的線程會一直阻塞,直到所有的讀線程都從 ReadWriteLock 上解鎖了。如果一直保證新線程的讀操作權(quán)限,那么等待寫操作的線程就會一直阻塞下去,結(jié)果就是發(fā)生“饑餓”。因此,只有當(dāng)沒有線程正在鎖住 ReadWriteLock 進(jìn)行寫操作,且沒有線程請求該鎖準(zhǔn)備執(zhí)行寫操作時,才能保證讀操作繼續(xù)。

當(dāng)其它線程沒有對共享資源進(jìn)行讀操作或者寫操作時,某個線程就有可能獲得該共享資源的寫鎖,進(jìn)而對共享資源進(jìn)行寫操作。有多少線程請求了寫鎖以及以何種順序請求寫鎖并不重要,除非你想保證寫鎖請求的公平性。

按照上面的敘述,簡單的實現(xiàn)出一個讀/寫鎖,代碼如下

public class ReadWriteLock{
    private int readers = 0;
    private int writers = 0;
    private int writeRequests = 0;

    public synchronized void lockRead() 
        throws InterruptedException{
        while(writers > 0 || writeRequests > 0){
            wait();
        }
        readers++;
    }

    public synchronized void unlockRead(){
        readers--;
        notifyAll();
    }

    public synchronized void lockWrite() 
        throws InterruptedException{
        writeRequests++;

        while(readers > 0 || writers > 0){
            wait();
        }
        writeRequests--;
        writers++;
    }

    public synchronized void unlockWrite() 
        throws InterruptedException{
        writers--;
        notifyAll();
    }
}

ReadWriteLock 類中,讀鎖和寫鎖各有一個獲取鎖和釋放鎖的方法。

讀鎖的實現(xiàn)在 lockRead()中,只要沒有線程擁有寫鎖(writers==0),且沒有線程在請求寫鎖(writeRequests ==0),所有想獲得讀鎖的線程都能成功獲取。

寫鎖的實現(xiàn)在 lockWrite()中,當(dāng)一個線程想獲得寫鎖的時候,首先會把寫鎖請求數(shù)加 1(writeRequests++),然后再去判斷是否能夠真能獲得寫鎖,當(dāng)沒有線程持有讀鎖(readers==0 ),且沒有線程持有寫鎖(writers==0)時就能獲得寫鎖。有多少線程在請求寫鎖并無關(guān)系。

需要注意的是,在兩個釋放鎖的方法(unlockRead,unlockWrite)中,都調(diào)用了 notifyAll 方法,而不是 notify。要解釋這個原因,我們可以想象下面一種情形:

如果有線程在等待獲取讀鎖,同時又有線程在等待獲取寫鎖。如果這時其中一個等待讀鎖的線程被 notify 方法喚醒,但因為此時仍有請求寫鎖的線程存在(writeRequests>0),所以被喚醒的線程會再次進(jìn)入阻塞狀態(tài)。然而,等待寫鎖的線程一個也沒被喚醒,就像什么也沒發(fā)生過一樣(譯者注:信號丟失現(xiàn)象)。如果用的是 notifyAll 方法,所有的線程都會被喚醒,然后判斷能否獲得其請求的鎖。

用 notifyAll 還有一個好處。如果有多個讀線程在等待讀鎖且沒有線程在等待寫鎖時,調(diào)用 unlockWrite()后,所有等待讀鎖的線程都能立馬成功獲取讀鎖 —— 而不是一次只允許一個。

讀/寫鎖的重入

上面實現(xiàn)的讀/寫鎖(ReadWriteLock) 是不可重入的,當(dāng)一個已經(jīng)持有寫鎖的線程再次請求寫鎖時,就會被阻塞。原因是已經(jīng)有一個寫線程了——就是它自己。此外,考慮下面的例子:

  1. Thread 1 獲得了讀鎖。
  2. Thread 2 請求寫鎖,但因為 Thread 1 持有了讀鎖,所以寫鎖請求被阻塞。
  3. Thread 1 再想請求一次讀鎖,但因為 Thread 2 處于請求寫鎖的狀態(tài),所以想再次獲取讀鎖也會被阻塞。 上面這種情形使用前面的 ReadWriteLock 就會被鎖定——一種類似于死鎖的情形。不會再有線程能夠成功獲取讀鎖或?qū)戞i了。

為了讓 ReadWriteLock 可重入,需要對它做一些改進(jìn)。下面會分別處理讀鎖的重入和寫鎖的重入。

讀鎖重入

為了讓 ReadWriteLock 的讀鎖可重入,我們要先為讀鎖重入建立規(guī)則:

要保證某個線程中的讀鎖可重入,要么滿足獲取讀鎖的條件(沒有寫或?qū)懻埱螅?,要么已?jīng)持有讀鎖(不管是否有寫請求)。 要確定一個線程是否已經(jīng)持有讀鎖,可以用一個 map 來存儲已經(jīng)持有讀鎖的線程以及對應(yīng)線程獲取讀鎖的次數(shù),當(dāng)需要判斷某個線程能否獲得讀鎖時,就利用 map 中存儲的數(shù)據(jù)進(jìn)行判斷。下面是方法 lockRead 和 unlockRead 修改后的的代碼:

public class ReadWriteLock{
    private Map<Thread, Integer> readingThreads =
        new HashMap<Thread, Integer>();

    private int writers = 0;
    private int writeRequests = 0;

    public synchronized void lockRead() 
        throws InterruptedException{
        Thread callingThread = Thread.currentThread();
        while(! canGrantReadAccess(callingThread)){
            wait();                                                                   
        }

        readingThreads.put(callingThread,
            (getAccessCount(callingThread) + 1));
    }

    public synchronized void unlockRead(){
        Thread callingThread = Thread.currentThread();
        int accessCount = getAccessCount(callingThread);
        if(accessCount == 1) { 
            readingThreads.remove(callingThread); 
        } else {
            readingThreads.put(callingThread, (accessCount -1)); 
        }
        notifyAll();
    }

    private boolean canGrantReadAccess(Thread callingThread){
        if(writers > 0) return false;
        if(isReader(callingThread) return true;
        if(writeRequests > 0) return false;
        return true;
    }

    private int getReadAccessCount(Thread callingThread){
        Integer accessCount = readingThreads.get(callingThread);
        if(accessCount == null) return 0;
        return accessCount.intValue();
    }

    private boolean isReader(Thread callingThread){
        return readingThreads.get(callingThread) != null;
    }
}

代碼中我們可以看到,只有在沒有線程擁有寫鎖的情況下才允許讀鎖的重入。此外,重入的讀鎖比寫鎖優(yōu)先級高。

寫鎖重入

僅當(dāng)一個線程已經(jīng)持有寫鎖,才允許寫鎖重入(再次獲得寫鎖)。下面是方法 lockWrite 和 unlockWrite 修改后的的代碼。

public class ReadWriteLock{
    private Map<Thread, Integer> readingThreads =
        new HashMap<Thread, Integer>();

    private int writeAccesses    = 0;
    private int writeRequests    = 0;
    private Thread writingThread = null;

    public synchronized void lockWrite() 
        throws InterruptedException{
        writeRequests++;
        Thread callingThread = Thread.currentThread();
        while(!canGrantWriteAccess(callingThread)){
            wait();
        }
        writeRequests--;
        writeAccesses++;
        writingThread = callingThread;
    }

    public synchronized void unlockWrite() 
        throws InterruptedException{
        writeAccesses--;
        if(writeAccesses == 0){
            writingThread = null;
        }
        notifyAll();
    }

    private boolean canGrantWriteAccess(Thread callingThread){
        if(hasReaders()) return false;
        if(writingThread == null)    return true;
        if(!isWriter(callingThread)) return false;
        return true;
    }

    private boolean hasReaders(){
        return readingThreads.size() > 0;
    }

    private boolean isWriter(Thread callingThread){
        return writingThread == callingThread;
    }
}

注意在確定當(dāng)前線程是否能夠獲取寫鎖的時候,是如何處理的。

讀鎖升級到寫鎖

有時,我們希望一個擁有讀鎖的線程,也能獲得寫鎖。想要允許這樣的操作,要求這個線程是唯一一個擁有讀鎖的線程。writeLock()需要做點改動來達(dá)到這個目的:

public class ReadWriteLock{
    private Map<Thread, Integer> readingThreads =
        new HashMap<Thread, Integer>();

    private int writeAccesses    = 0;
    private int writeRequests    = 0;
    private Thread writingThread = null;

    public synchronized void lockWrite() 
        throws InterruptedException{
        writeRequests++;
        Thread callingThread = Thread.currentThread();
        while(!canGrantWriteAccess(callingThread)){
            wait();
        }
        writeRequests--;
        writeAccesses++;
        writingThread = callingThread;
    }

    public synchronized void unlockWrite() throws InterruptedException{
        writeAccesses--;
        if(writeAccesses == 0){
            writingThread = null;
        }
        notifyAll();
    }

    private boolean canGrantWriteAccess(Thread callingThread){
        if(isOnlyReader(callingThread)) return true;
        if(hasReaders()) return false;
        if(writingThread == null) return true;
        if(!isWriter(callingThread)) return false;
        return true;
    }

    private boolean hasReaders(){
        return readingThreads.size() > 0;
    }

    private boolean isWriter(Thread callingThread){
        return writingThread == callingThread;
    }

    private boolean isOnlyReader(Thread thread){
        return readers == 1 && readingThreads.get(callingThread) != null;
    }
}

現(xiàn)在 ReadWriteLock 類就可以從讀鎖升級到寫鎖了。

寫鎖降級到讀鎖

有時擁有寫鎖的線程也希望得到讀鎖。如果一個線程擁有了寫鎖,那么自然其它線程是不可能擁有讀鎖或?qū)戞i了。所以對于一個擁有寫鎖的線程,再獲得讀鎖,是不會有什么危險的。我們僅僅需要對上面 canGrantReadAccess 方法進(jìn)行簡單地修改:

public class ReadWriteLock{
    private boolean canGrantReadAccess(Thread callingThread){
        if(isWriter(callingThread)) return true;
        if(writingThread != null) return false;
        if(isReader(callingThread) return true;
        if(writeRequests > 0) return false;
        return true;
    }
}

可重入的 ReadWriteLock 的完整實現(xiàn)

下面是完整的 ReadWriteLock 實現(xiàn)。為了便于代碼的閱讀與理解,簡單對上面的代碼做了重構(gòu)。重構(gòu)后的代碼如下。

public class ReadWriteLock{
    private Map<Thread, Integer> readingThreads =
        new HashMap<Thread, Integer>();

    private int writeAccesses    = 0;
    private int writeRequests    = 0;
    private Thread writingThread = null;

    public synchronized void lockRead() 
        throws InterruptedException{
        Thread callingThread = Thread.currentThread();
        while(! canGrantReadAccess(callingThread)){
            wait();
        }

        readingThreads.put(callingThread,
            (getReadAccessCount(callingThread) + 1));
    }

    private boolean canGrantReadAccess(Thread callingThread){
        if(isWriter(callingThread)) return true;
        if(hasWriter()) return false;
        if(isReader(callingThread)) return true;
        if(hasWriteRequests()) return false;
        return true;
    }

    public synchronized void unlockRead(){
        Thread callingThread = Thread.currentThread();
        if(!isReader(callingThread)){
            throw new IllegalMonitorStateException(
                "Calling Thread does not" +
                " hold a read lock on this ReadWriteLock");
        }
        int accessCount = getReadAccessCount(callingThread);
        if(accessCount == 1){ 
            readingThreads.remove(callingThread); 
        } else { 
            readingThreads.put(callingThread, (accessCount -1));
        }
        notifyAll();
    }

    public synchronized void lockWrite() 
        throws InterruptedException{
        writeRequests++;
        Thread callingThread = Thread.currentThread();
        while(!canGrantWriteAccess(callingThread)){
            wait();
        }
        writeRequests--;
        writeAccesses++;
        writingThread = callingThread;
    }

    public synchronized void unlockWrite() 
        throws InterruptedException{
        if(!isWriter(Thread.currentThread()){
        throw new IllegalMonitorStateException(
            "Calling Thread does not" +
            " hold the write lock on this ReadWriteLock");
        }
        writeAccesses--;
        if(writeAccesses == 0){
            writingThread = null;
        }
        notifyAll();
    }

    private boolean canGrantWriteAccess(Thread callingThread){
        if(isOnlyReader(callingThread)) return true;
        if(hasReaders()) return false;
        if(writingThread == null) return true;
        if(!isWriter(callingThread)) return false;
        return true;
    }

    private int getReadAccessCount(Thread callingThread){
        Integer accessCount = readingThreads.get(callingThread);
        if(accessCount == null) return 0;
        return accessCount.intValue();
    }

    private boolean hasReaders(){
        return readingThreads.size() > 0;
    }

    private boolean isReader(Thread callingThread){
        return readingThreads.get(callingThread) != null;
    }

    private boolean isOnlyReader(Thread callingThread){
        return readingThreads.size() == 1 &&
            readingThreads.get(callingThread) != null;
    }

    private boolean hasWriter(){
        return writingThread != null;
    }

    private boolean isWriter(Thread callingThread){
        return writingThread == callingThread;
    }

    private boolean hasWriteRequests(){
        return this.writeRequests > 0;
    }
}

在 finally 中調(diào)用 unlock()

在利用 ReadWriteLock 來保護(hù)臨界區(qū)時,如果臨界區(qū)可能拋出異常,在 finally 塊中調(diào)用 readUnlock()和 writeUnlock()就顯得很重要了。這樣做是為了保證 ReadWriteLock 能被成功解鎖,然后其它線程可以請求到該鎖。這里有個例子:

lock.lockWrite();
try{
    //do critical section code, which may throw exception
} finally {
    lock.unlockWrite();
}

上面這樣的代碼結(jié)構(gòu)能夠保證臨界區(qū)中拋出異常時 ReadWriteLock 也會被釋放。如果 unlockWrite 方法不是在 finally 塊中調(diào)用的,當(dāng)臨界區(qū)拋出了異常時,ReadWriteLock 會一直保持在寫鎖定狀態(tài),就會導(dǎo)致所有調(diào)用 lockRead()或 lockWrite()的線程一直阻塞。唯一能夠重新解鎖 ReadWriteLock 的因素可能就是 ReadWriteLock 是可重入的,當(dāng)拋出異常時,這個線程后續(xù)還可以成功獲取這把鎖,然后執(zhí)行臨界區(qū)以及再次調(diào)用 unlockWrite(),這就會再次釋放 ReadWriteLock。但是如果該線程后續(xù)不再獲取這把鎖了呢?所以,在 finally 中調(diào)用 unlockWrite 對寫出健壯代碼是很重要的。