鍍金池/ 教程/ Python/ 并發(fā)編程
類與對(duì)象
模塊與包
數(shù)據(jù)編碼和處理
元編程
網(wǎng)絡(luò)與 Web 編程
數(shù)字日期和時(shí)間
測試、調(diào)試和異常
字符串和文本
文件與 IO
腳本編程與系統(tǒng)管理
迭代器與生成器
函數(shù)
C 語言擴(kuò)展
并發(fā)編程
數(shù)據(jù)結(jié)構(gòu)和算法

并發(fā)編程

對(duì)于并發(fā)編程, Python 有多種長期支持的方法, 包括多線程, 調(diào)用子進(jìn)程, 以及各種各樣的關(guān)于生成器函數(shù)的技巧. 這一章將會(huì)給出并發(fā)編程各種方面的技巧, 包括通用的多線程技術(shù)以及并行計(jì)算的實(shí)現(xiàn)方法.

像經(jīng)驗(yàn)豐富的程序員所知道的那樣, 大家擔(dān)心并發(fā)的程序有潛在的危險(xiǎn). 因此, 本章的主要目標(biāo)之一是給出更加可信賴和易調(diào)試的代碼.

啟動(dòng)與停止線程

問題

你要為需要并發(fā)執(zhí)行的代碼創(chuàng)建/銷毀線程

解決方案

threading庫可以在單獨(dú)的線程中執(zhí)行任何的在 Python 中可以調(diào)用的對(duì)象。你可以創(chuàng)建一個(gè) Thread 對(duì)象并將你要執(zhí)行的對(duì)象以 target 參數(shù)的形式提供給該對(duì)象。 下面是一個(gè)簡單的例子:

# Code to execute in an independent thread
import time
def countdown(n):
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)

# Create and launch a thread
from threading import Thread
t = Thread(target=countdown, args=(10,))
t.start()

當(dāng)你創(chuàng)建好一個(gè)線程對(duì)象后,該對(duì)象并不會(huì)立即執(zhí)行,除非你調(diào)用它的 start() 方法(當(dāng)你調(diào)用 start() 方法時(shí),它會(huì)調(diào)用你傳遞進(jìn)來的函數(shù),并把你傳遞進(jìn)來的參數(shù)傳遞給該函數(shù))。Python 中的線程會(huì)在一個(gè)單獨(dú)的系統(tǒng)級(jí)線程中執(zhí)行(比如說一個(gè) POSIX 線程或者一個(gè) Windows 線程),這些線程將由操作系統(tǒng)來全權(quán)管理。線程一旦啟動(dòng),將獨(dú)立執(zhí)行直到目標(biāo)函數(shù)返回。你可以查詢一個(gè)線程對(duì)象的狀態(tài),看它是否還在執(zhí)行:

if t.is_alive():
    print('Still running')
else:
    print('Completed')

你也可以將一個(gè)線程加入到當(dāng)前線程,并等待它終止:

t.join()

Python 解釋器在所有線程都終止后才繼續(xù)執(zhí)行代碼剩余的部分。對(duì)于需要長時(shí)間運(yùn)行的線程或者需要一直運(yùn)行的后臺(tái)任務(wù),你應(yīng)當(dāng)考慮使用后臺(tái)線程。 例如:

t = Thread(target=countdown, args=(10,), daemon=True)
t.start()

后臺(tái)線程無法等待,不過,這些線程會(huì)在主線程終止時(shí)自動(dòng)銷毀。 除了如上所示的兩個(gè)操作,并沒有太多可以對(duì)線程做的事情。你無法結(jié)束一個(gè)線程,無法給它發(fā)送信號(hào),無法調(diào)整它的調(diào)度,也無法執(zhí)行其他高級(jí)操作。如果需要這些特性,你需要自己添加。比如說,如果你需要終止線程,那么這個(gè)線程必須通過編程在某個(gè)特定點(diǎn)輪詢來退出。你可以像下邊這樣把線程放入一個(gè)類中:

class CountdownTask:
    def __init__(self):
        self._running = True

    def terminate(self):
        self._running = False

    def run(self, n):
        while self._running and n > 0:
            print('T-minus', n)
            n -= 1
            time.sleep(5)

    c = CountdownTask()
    t = Thread(target=c.run, args=(10,))
    t.start()
    c.terminate() # Signal termination
    t.join()      # Wait for actual termination (if needed)

如果線程執(zhí)行一些像 I/O 這樣的阻塞操作,那么通過輪詢來終止線程將使得線程之間的協(xié)調(diào)變得非常棘手。比如,如果一個(gè)線程一直阻塞在一個(gè) I/O 操作上,它就永遠(yuǎn)無法返回,也就無法檢查自己是否已經(jīng)被結(jié)束了。要正確處理這些問題,你需要利用超時(shí)循環(huán)來小心操作線程。 例子如下:

class IOTask:
    def terminate(self):
        self._running = False

    def run(self, sock):
        # sock is a socket
        sock.settimeout(5)        # Set timeout period
        while self._running:
            # Perform a blocking I/O operation w/ timeout
            try:
                data = sock.recv(8192)
                break
            except socket.timeout:
                continue
            # Continued processing
            ...
        # Terminated
        return

討論

由于全局解釋鎖(GIL)的原因,Python 的線程被限制到同一時(shí)刻只允許一個(gè)線程執(zhí)行這樣一個(gè)執(zhí)行模型。所以,Python 的線程更適用于處理 I/O 和其他需要并發(fā)執(zhí)行的阻塞操作(比如等待 I/O、等待從數(shù)據(jù)庫獲取數(shù)據(jù)等等),而不是需要多處理器并行的計(jì)算密集型任務(wù)。

有時(shí)你會(huì)看到下邊這種通過繼承 Thread 類來實(shí)現(xiàn)的線程:

from threading import Thread

class CountdownThread(Thread):
    def __init__(self, n):
        super().__init__()
        self.n = 0
    def run(self):
        while self.n > 0:

            print('T-minus', self.n)
            self.n -= 1
            time.sleep(5)

c = CountdownThread(5)
c.start()

盡管這樣也可以工作,但這使得你的代碼依賴于 threading 庫,所以你的這些代碼只能在線程上下文中使用。上文所寫的那些代碼、函數(shù)都是與 threading 庫無關(guān)的,這樣就使得這些代碼可以被用在其他的上下文中,可能與線程有關(guān),也可能與線程無關(guān)。比如,你可以通過 multiprocessing 模塊在一個(gè)單獨(dú)的進(jìn)程中執(zhí)行你的代碼:

import multiprocessing
c = CountdownTask(5)
p = multiprocessing.Process(target=c.run)
p.start()

再次重申,這段代碼僅適用于 CountdownTask 類是以獨(dú)立于實(shí)際的并發(fā)手段(多線程、多進(jìn)程等等)實(shí)現(xiàn)的情況。

判斷線程是否已經(jīng)啟動(dòng)

問題

你已經(jīng)啟動(dòng)了一個(gè)線程,但是你想知道它是不是真的已經(jīng)開始運(yùn)行了。

解決方案

線程的一個(gè)關(guān)鍵特性是每個(gè)線程都是獨(dú)立運(yùn)行且狀態(tài)不可預(yù)測。如果程序中的其他線程需要通過判斷某個(gè)線程的狀態(tài)來確定自己下一步的操作,這時(shí)線程同步問題就會(huì)變得非常棘手。為了解決這些問題,我們需要使用 threading 庫中的 Event 對(duì)象。 Event 對(duì)象包含一個(gè)可由線程設(shè)置的信號(hào)標(biāo)志,它允許線程等待某些事件的發(fā)生。在初始情況下,event 對(duì)象中的信號(hào)標(biāo)志被設(shè)置為假。如果有線程等待一個(gè) event 對(duì)象,而這個(gè) event 對(duì)象的標(biāo)志為假,那么這個(gè)線程將會(huì)被一直阻塞直至該標(biāo)志為真。一個(gè)線程如果將一個(gè) event 對(duì)象的信號(hào)標(biāo)志設(shè)置為真,它將喚醒所有等待這個(gè) event 對(duì)象的線程。如果一個(gè)線程等待一個(gè)已經(jīng)被設(shè)置為真的 event 對(duì)象,那么它將忽略這個(gè)事件,繼續(xù)執(zhí)行。 下邊的代碼展示了如何使用 Event 來協(xié)調(diào)線程的啟動(dòng):

from threading import Thread, Event
import time

# Code to execute in an independent thread
def countdown(n, started_evt):
    print('countdown starting')
    started_evt.set()
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)

# Create the event object that will be used to signal startup
started_evt = Event()

# Launch the thread and pass the startup event
print('Launching countdown')
t = Thread(target=countdown, args=(10,started_evt))
t.start()

# Wait for the thread to start
started_evt.wait()
print('countdown is running')

當(dāng)你執(zhí)行這段代碼,“countdown is running” 總是顯示在 “countdown starting” 之后顯示。這是由于使用 event 來協(xié)調(diào)線程,使得主線程要等到 countdown() 函數(shù)輸出啟動(dòng)信息后,才能繼續(xù)執(zhí)行。

討論

event 對(duì)象最好單次使用,就是說,你創(chuàng)建一個(gè) event 對(duì)象,讓某個(gè)線程等待這個(gè)對(duì)象,一旦這個(gè)對(duì)象被設(shè)置為真,你就應(yīng)該丟棄它。盡管可以通過 clear() 方法來重置 event 對(duì)象,但是很難確保安全地清理 event 對(duì)象并對(duì)它重新賦值。很可能會(huì)發(fā)生錯(cuò)過事件、死鎖或者其他問題(特別是,你無法保證重置 event 對(duì)象的代碼會(huì)在線程再次等待這個(gè) event 對(duì)象之前執(zhí)行)。如果一個(gè)線程需要不停地重復(fù)使用 event 對(duì)象,你最好使用 Condition 對(duì)象來代替。下面的代碼使用 Condition 對(duì)象實(shí)現(xiàn)了一個(gè)周期定時(shí)器,每當(dāng)定時(shí)器超時(shí)的時(shí)候,其他線程都可以監(jiān)測到:

import threading
import time

class PeriodicTimer:
    def __init__(self, interval):
        self._interval = interval
        self._flag = 0
        self._cv = threading.Condition()

    def start(self):
        t = threading.Thread(target=self.run)
        t.daemon = True

        t.start()

    def run(self):
        '''
        Run the timer and notify waiting threads after each interval
        '''
        while True:
            time.sleep(self._interval)
            with self._cv:
                 self._flag ^= 1
                 self._cv.notify_all()

    def wait_for_tick(self):
        '''
        Wait for the next tick of the timer
        '''
        with self._cv:
            last_flag = self._flag
            while last_flag == self._flag:
                self._cv.wait()

# Example use of the timer
ptimer = PeriodicTimer(5)
ptimer.start()

# Two threads that synchronize on the timer
def countdown(nticks):
    while nticks > 0:
        ptimer.wait_for_tick()
        print('T-minus', nticks)
        nticks -= 1

def countup(last):
    n = 0
    while n < last:
        ptimer.wait_for_tick()
        print('Counting', n)
        n += 1

threading.Thread(target=countdown, args=(10,)).start()
threading.Thread(target=countup, args=(5,)).start()

event 對(duì)象的一個(gè)重要特點(diǎn)是當(dāng)它被設(shè)置為真時(shí)會(huì)喚醒所有等待它的線程。如果你只想喚醒單個(gè)線程,最好是使用信號(hào)量或者 Condition 對(duì)象來替代??紤]一下這段使用信號(hào)量實(shí)現(xiàn)的代碼:

# Worker thread
def worker(n, sema):
    # Wait to be signaled
    sema.acquire()

    # Do some work
    print('Working', n)

# Create some threads
sema = threading.Semaphore(0)
nworkers = 10
for n in range(nworkers):
    t = threading.Thread(target=worker, args=(n, sema,))
    t.start()

運(yùn)行上邊的代碼將會(huì)啟動(dòng)一個(gè)線程池,但是并沒有什么事情發(fā)生。這是因?yàn)樗械木€程都在等待獲取信號(hào)量。每次信號(hào)量被釋放,只有一個(gè)線程會(huì)被喚醒并執(zhí)行,示例如下:

>>> sema.release()
Working 0
>>> sema.release()
Working 1
>>>

編寫涉及到大量的線程間同步問題的代碼會(huì)讓你痛不欲生。比較合適的方式是使用隊(duì)列來進(jìn)行線程間通信或者每個(gè)把線程當(dāng)作一個(gè) Actor,利用 Actor 模型來控制并發(fā)。下一節(jié)將會(huì)介紹到隊(duì)列,而 Actor 模型將在12.10節(jié)介紹。

線程間通信

問題

你的程序中有多個(gè)線程,你需要在這些線程之間安全地交換信息或數(shù)據(jù)

解決方案

從一個(gè)線程向另一個(gè)線程發(fā)送數(shù)據(jù)最安全的方式可能就是使用 queue 庫中的隊(duì)列了。創(chuàng)建一個(gè)被多個(gè)線程共享的 Queue 對(duì)象,這些線程通過使用put()get() 操作來向隊(duì)列中添加或者刪除元素。 例如:

Queue對(duì)象已經(jīng)包含了必要的鎖,所以你可以通過它在多個(gè)線程間多安全地共享數(shù)據(jù)。 當(dāng)使用隊(duì)列時(shí),協(xié)調(diào)生產(chǎn)者和消費(fèi)者的關(guān)閉問題可能會(huì)有一些麻煩。一個(gè)通用的解決方法是在隊(duì)列中放置一個(gè)特殊的只,當(dāng)消費(fèi)者讀到這個(gè)值的時(shí)候,終止執(zhí)行。例如:

from queue import Queue
from threading import Thread

# Object that signals shutdown
_sentinel = object()

# A thread that produces data
def producer(out_q):
    while running:
        # Produce some data
        ...
        out_q.put(data)

    # Put the sentinel on the queue to indicate completion
    out_q.put(_sentinel)

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()

        # Check for termination
        if data is _sentinel:
            in_q.put(_sentinel)
            break

        # Process the data
        ...

本例中有一個(gè)特殊的地方:消費(fèi)者在讀到這個(gè)特殊值之后立即又把它放回到隊(duì)列中,將之傳遞下去。這樣,所有監(jiān)聽這個(gè)隊(duì)列的消費(fèi)者線程就可以全部關(guān)閉了。 盡管隊(duì)列是最常見的線程間通信機(jī)制,但是仍然可以自己通過創(chuàng)建自己的數(shù)據(jù)結(jié)構(gòu)并添加所需的鎖和同步機(jī)制來實(shí)現(xiàn)線程間通信。最常見的方法是使用Condition變量來包裝你的數(shù)據(jù)結(jié)構(gòu)。下邊這個(gè)例子演示了如何創(chuàng)建一個(gè)線程安全的優(yōu)先級(jí)隊(duì)列,如同1.5節(jié)中介紹的那樣。

import heapq
import threading

class PriorityQueue:
    def __init__(self):
        self._queue = []
        self._count = 0
        self._cv = threading.Condition()
    def put(self, item, priority):
        with self._cv:
            heapq.heappush(self._queue, (-priority, self._count, item))
            self._count += 1
            self._cv.notify()

    def get(self):
        with self._cv:
            while len(self._queue) == 0:
                self._cv.wait()
            return heapq.heappop(self._queue)[-1]

使用隊(duì)列來進(jìn)行線程間通信是一個(gè)單向、不確定的過程。通常情況下,你沒有辦法知道接收數(shù)據(jù)的線程是什么時(shí)候接收到的數(shù)據(jù)并開始工作的。不過隊(duì)列對(duì)象提供一些基本完成的特性,比如下邊這個(gè)例子中的task_done()join()

from queue import Queue
from threading import Thread

# A thread that produces data
def producer(out_q):
    while running:
        # Produce some data
        ...
        out_q.put(data)

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()

        # Process the data
        ...
        # Indicate completion
        in_q.task_done()

# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()

# Wait for all produced items to be consumed
q.join()

如果一個(gè)線程需要在一個(gè)“消費(fèi)者”線程處理完特定的數(shù)據(jù)項(xiàng)時(shí)立即得到通知,你可以把要發(fā)送的數(shù)據(jù)和一個(gè) Event 放到一起使用,這樣“生產(chǎn)者”就可以通過這個(gè) Event 對(duì)象來監(jiān)測處理的過程了。示例如下:

from queue import Queue
from threading import Thread, Event

# A thread that produces data
def producer(out_q):
    while running:
        # Produce some data
        ...
        # Make an (data, event) pair and hand it to the consumer
        evt = Event()
        out_q.put((data, evt))
        ...
        # Wait for the consumer to process the item
        evt.wait()

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data, evt = in_q.get()
        # Process the data
        ...
        # Indicate completion
        evt.set()

討論

基于簡單隊(duì)列編寫多線程程序在多數(shù)情況下是一個(gè)比較明智的選擇。從線程安全隊(duì)列的底層實(shí)現(xiàn)來看,你無需在你的代碼中使用鎖和其他底層的同步機(jī)制,這些只會(huì)把你的程序弄得亂七八糟。此外,使用隊(duì)列這種基于消息的通信機(jī)制可以被擴(kuò)展到更大的應(yīng)用范疇,比如,你可以把你的程序放入多個(gè)進(jìn)程甚至是分布式系統(tǒng)而無需改變底層的隊(duì)列結(jié)構(gòu)。 使用線程隊(duì)列有一個(gè)要注意的問題是,向隊(duì)列中添加數(shù)據(jù)項(xiàng)時(shí)并不會(huì)復(fù)制此數(shù)據(jù)項(xiàng),線程間通信實(shí)際上是在線程間傳遞對(duì)象引用。如果你擔(dān)心對(duì)象的共享狀態(tài),那你最好只傳遞不可修改的數(shù)據(jù)結(jié)構(gòu)(如:整型、字符串或者元組)或者一個(gè)對(duì)象的深拷貝。例如:

from queue import Queue
from threading import Thread
import copy

# A thread that produces data
def producer(out_q):
    while True:
        # Produce some data
        ...
        out_q.put(copy.deepcopy(data))

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()
        # Process the data
        ...

Queue 對(duì)象提供一些在當(dāng)前上下文很有用的附加特性。比如在創(chuàng)建 Queue 對(duì)象時(shí)提供可選的size 參數(shù)來限制可以添加到隊(duì)列中的元素?cái)?shù)量。對(duì)于“生產(chǎn)者”與“消費(fèi)者”速度有差異的情況,為隊(duì)列中的元素?cái)?shù)量添加上限是有意義的。比如,一個(gè)“生產(chǎn)者”產(chǎn)生項(xiàng)目的速度比“消費(fèi)者” “消費(fèi)”的速度快,那么使用固定大小的隊(duì)列就可以在隊(duì)列已滿的時(shí)候阻塞隊(duì)列,以免未預(yù)期的連鎖效應(yīng)擴(kuò)散整個(gè)程序造成死鎖或者程序運(yùn)行失常。在通信的線程之間進(jìn)行“流量控制”是一個(gè)看起來容易實(shí)現(xiàn)起來困難的問題。如果你發(fā)現(xiàn)自己曾經(jīng)試圖通過擺弄隊(duì)列大小來解決一個(gè)問題,這也許就標(biāo)志著你的程序可能存在脆弱設(shè)計(jì)或者固有的可伸縮問題。 get()put()方法都支持非阻塞方式和設(shè)定超時(shí),例如:

import queue
q = queue.Queue()

try:
    data = q.get(block=False)
except queue.Empty:
    ...

try:
    q.put(item, block=False)
except queue.Full:
    ...

try:
    data = q.get(timeout=5.0)
except queue.Empty:
    ...

這些操作都可以用來避免當(dāng)執(zhí)行某些特定隊(duì)列操作時(shí)發(fā)生無限阻塞的情況,比如,一個(gè)非阻塞的put() 方法和一個(gè)固定大小的隊(duì)列一起使用,這樣當(dāng)隊(duì)列已滿時(shí)就可以執(zhí)行不同的代碼。比如輸出一條日志信息并丟棄。

def producer(q):
    ...
    try:
        q.put(item, block=False)
    except queue.Full:
        log.warning('queued item %r discarded!', item)

如果你試圖讓消費(fèi)者線程在執(zhí)行像 q.get()這樣的操作時(shí),超時(shí)自動(dòng)終止以便檢查終止標(biāo)志,你應(yīng)該使用q.get()的可選參數(shù) timeout,如下:

_running = True

def consumer(q):
    while _running:
        try:
            item = q.get(timeout=5.0)
            # Process item
            ...
        except queue.Empty:
            pass

最后,有 q.qsize() , q.full()q.empty() 等實(shí)用方法可以獲取一個(gè)隊(duì)列的當(dāng)前大小和狀態(tài)。但要注意,這些方法都不是線程安全的??赡苣銓?duì)一個(gè)隊(duì)列使用 empty()判斷出這個(gè)隊(duì)列為空,但同時(shí)另外一個(gè)線程可能已經(jīng)向這個(gè)隊(duì)列中插入一個(gè)數(shù)據(jù)項(xiàng)。所以,你最好不要在你的代碼中使用這些方法。

給關(guān)鍵部分加鎖

問題

你需要對(duì)多線程程序中的臨界區(qū)加鎖以避免競爭條件。

解決方案

要在多線程程序中安全使用可變對(duì)象,你需要使用 threading 庫中的 Lock 對(duì)象,就像下邊這個(gè)例子這樣:

import threading

class SharedCounter:
    '''
    A counter object that can be shared by multiple threads.
    '''
    def __init__(self, initial_value = 0):
        self._value = initial_value
        self._value_lock = threading.Lock()

    def incr(self,delta=1):
        '''
        Increment the counter with locking
        '''
        with self._value_lock:
             self._value += delta

    def decr(self,delta=1):
        '''
        Decrement the counter with locking
        '''
        with self._value_lock:
             self._value -= delta

Lock 對(duì)象和 with 語句塊一起使用可以保證互斥執(zhí)行,就是每次只有一個(gè)線程可以執(zhí)行 with 語句包含的代碼塊。with 語句會(huì)在這個(gè)代碼塊執(zhí)行前自動(dòng)獲取鎖,在執(zhí)行結(jié)束后自動(dòng)釋放鎖。

討論

線程調(diào)度本質(zhì)上是不確定的,因此,在多線程程序中錯(cuò)誤地使用鎖機(jī)制可能會(huì)導(dǎo)致隨機(jī)數(shù)據(jù)損壞或者其他的異常行為,我們稱之為競爭條件。為了避免競爭條件,最好只在臨界區(qū)(對(duì)臨界資源進(jìn)行操作的那部分代碼)使用鎖。 在一些“老的” Python 代碼中,顯式獲取和釋放鎖是很常見的。下邊是一個(gè)上一個(gè)例子的變種:

import threading

class SharedCounter:
    '''
    A counter object that can be shared by multiple threads.
    '''
    def __init__(self, initial_value = 0):
        self._value = initial_value
        self._value_lock = threading.Lock()

    def incr(self,delta=1):
        '''
        Increment the counter with locking
        '''
        self._value_lock.acquire()
        self._value += delta
        self._value_lock.release()

    def decr(self,delta=1):
        '''
        Decrement the counter with locking
        '''
        self._value_lock.acquire()
        self._value -= delta
        self._value_lock.release()

相比于這種顯式調(diào)用的方法,with 語句更加優(yōu)雅,也更不容易出錯(cuò),特別是程序員可能會(huì)忘記調(diào)用 release() 方法或者程序在獲得鎖之后產(chǎn)生異常這兩種情況(使用 with 語句可以保證在這兩種情況下仍能正確釋放鎖)。 為了避免出現(xiàn)死鎖的情況,使用鎖機(jī)制的程序應(yīng)該設(shè)定為每個(gè)線程一次只允許獲取一個(gè)鎖。如果不能這樣做的話,你就需要更高級(jí)的死鎖避免機(jī)制,我們將在12.5節(jié)介紹。 在 threading庫中還提供了其他的同步原語,比如 RLoctSemaphore 對(duì)象。但是根據(jù)以往經(jīng)驗(yàn),這些原語是用于一些特殊的情況,如果你只是需要簡單地對(duì)可變對(duì)象進(jìn)行鎖定,那就不應(yīng)該使用它們。一個(gè) RLock(可重入鎖)可以被同一個(gè)線程多次獲取,主要用來實(shí)現(xiàn)基于監(jiān)測對(duì)象模式的鎖定和同步。在使用這種鎖的情況下,當(dāng)鎖被持有時(shí),只有一個(gè)線程可以使用完整的函數(shù)或者類中的方法。比如,你可以實(shí)現(xiàn)一個(gè)這樣的 SharedCounter 類:

import threading

class SharedCounter:
    '''
    A counter object that can be shared by multiple threads.
    '''
    _lock = threading.RLock()
    def __init__(self, initial_value = 0):
        self._value = initial_value

    def incr(self,delta=1):
        '''
        Increment the counter with locking
        '''
        with SharedCounter._lock:
            self._value += delta

    def decr(self,delta=1):
        '''
        Decrement the counter with locking
        '''
        with SharedCounter._lock:
             self.incr(-delta)

在上邊這個(gè)例子中,沒有對(duì)每一個(gè)實(shí)例中的可變對(duì)象加鎖,取而代之的是一個(gè)被所有實(shí)例共享的類級(jí)鎖。這個(gè)鎖用來同步類方法,具體來說就是,這個(gè)鎖可以保證一次只有一個(gè)線程可以調(diào)用這個(gè)類方法。不過,與一個(gè)標(biāo)準(zhǔn)的鎖不同的是,已經(jīng)持有這個(gè)鎖的方法在調(diào)用同樣使用這個(gè)鎖的方法時(shí),無需再次獲取鎖。比如 decr 方法。 這種實(shí)現(xiàn)方式的一個(gè)特點(diǎn)是,無論這個(gè)類有多少個(gè)實(shí)例都只用一個(gè)鎖。因此在需要大量使用計(jì)數(shù)器的情況下內(nèi)存效率更高。不過這樣做也有缺點(diǎn),就是在程序中使用大量線程并頻繁更新計(jì)數(shù)器時(shí)會(huì)有爭用鎖的問題。 信號(hào)量對(duì)象是一個(gè)建立在共享計(jì)數(shù)器基礎(chǔ)上的同步原語。如果計(jì)數(shù)器不為0,with 語句將計(jì)數(shù)器減1,線程被允許執(zhí)行。with 語句執(zhí)行結(jié)束后,計(jì)數(shù)器加1。如果計(jì)數(shù)器為0,線程將被阻塞,直到其他線程結(jié)束將計(jì)數(shù)器加1。盡管你可以在程序中像標(biāo)準(zhǔn)鎖一樣使用信號(hào)量來做線程同步,但是這種方式并不被推薦,因?yàn)槭褂眯盘?hào)量為程序增加的復(fù)雜性會(huì)影響程序性能。相對(duì)于簡單地作為鎖使用,信號(hào)量更適用于那些需要在線程之間引入信號(hào)或者限制的程序。比如,你需要限制一段代碼的并發(fā)訪問量,你就可以像下面這樣使用信號(hào)量完成:

from threading import Semaphore
import urllib.request

# At most, five threads allowed to run at once
_fetch_url_sema = Semaphore(5)

def fetch_url(url):
    with _fetch_url_sema:
        return urllib.request.urlopen(url)

如果你對(duì)線程同步原語的底層理論和實(shí)現(xiàn)感興趣,可以參考操作系統(tǒng)相關(guān)書籍,絕大多數(shù)都有提及。

防止死鎖的加鎖機(jī)制

問題

你正在寫一個(gè)多線程程序,其中線程需要一次獲取多個(gè)鎖,此時(shí)如何避免死鎖問題。

解決方案

在多線程程序中,死鎖問題很大一部分是由于線程同時(shí)獲取多個(gè)鎖造成的。舉個(gè)例子:一個(gè)線程獲取了第一個(gè)鎖,然后在獲取第二個(gè)鎖的時(shí)候發(fā)生阻塞,那么這個(gè)線程就可能阻塞其他線程的執(zhí)行,從而導(dǎo)致整個(gè)程序假死。 解決死鎖問題的一種方案是為程序中的每一個(gè)鎖分配一個(gè)唯一的 id,然后只允許按照升序規(guī)則來使用多個(gè)鎖,這個(gè)規(guī)則使用上下文管理器 是非常容易實(shí)現(xiàn)的,示例如下:

如何使用這個(gè)上下文管理器呢?你可以按照正常途徑創(chuàng)建一個(gè)鎖對(duì)象,但不論是單個(gè)鎖還是多個(gè)鎖中都使用 acquire()函數(shù)來申請鎖, 示例如下:

如果你執(zhí)行這段代碼,你會(huì)發(fā)現(xiàn)它即使在不同的函數(shù)中以不同的順序獲取鎖也沒有發(fā)生死鎖。 其關(guān)鍵在于,在第一段代碼中,我們對(duì)這些鎖進(jìn)行了排序。通過排序,使得不管用戶以什么樣的順序來請求鎖,這些鎖都會(huì)按照固定的順序被獲取。 如果有多個(gè) acquire() 操作被嵌套調(diào)用,可以通過線程本地存儲(chǔ)(TLS)來檢測潛在的死鎖問題。 假設(shè)你的代碼是這樣寫的:

如果你運(yùn)行這個(gè)版本的代碼,必定會(huì)有一個(gè)線程發(fā)生崩潰,異常信息可能像這樣:

發(fā)生崩潰的原因在于,每個(gè)線程都記錄著自己已經(jīng)獲取到的鎖。acquire()函數(shù)會(huì)檢查之前已經(jīng)獲取的鎖列表, 由于鎖是按照升序排列獲取的,所以函數(shù)會(huì)認(rèn)為之前已獲取的鎖的 id 必定小于新申請到的鎖,這時(shí)就會(huì)觸發(fā)異常。

討論

死鎖是每一個(gè)多線程程序都會(huì)面臨的一個(gè)問題(就像它是每一本操作系統(tǒng)課本的共同話題一樣)。根據(jù)經(jīng)驗(yàn)來講,盡可能保證每一個(gè) 線程只能同時(shí)保持一個(gè)鎖,這樣程序就不會(huì)被死鎖問題所困擾。一旦有線程同時(shí)申請多個(gè)鎖,一切就不可預(yù)料了。

死鎖的檢測與恢復(fù)是一個(gè)幾乎沒有優(yōu)雅的解決方案的擴(kuò)展話題。一個(gè)比較常用的死鎖檢測與恢復(fù)的方案是引入看門狗計(jì)數(shù)器。當(dāng)線程正常 運(yùn)行的時(shí)候會(huì)每隔一段時(shí)間重置計(jì)數(shù)器,在沒有發(fā)生死鎖的情況下,一切都正常進(jìn)行。一旦發(fā)生死鎖,由于無法重置計(jì)數(shù)器導(dǎo)致定時(shí)器 超時(shí),這時(shí)程序會(huì)通過重啟自身恢復(fù)到正常狀態(tài)。

避免死鎖是另外一種解決死鎖問題的方式,在進(jìn)程獲取鎖的時(shí)候會(huì)嚴(yán)格按照對(duì)象id升序排列獲取,經(jīng)過數(shù)學(xué)證明,這樣保證程序不會(huì)進(jìn)入 死鎖狀態(tài)。證明就留給讀者作為練習(xí)了。避免死鎖的主要思想是,單純地按照對(duì)象 id 遞增的順序加鎖不會(huì)產(chǎn)生循環(huán)依賴,而循環(huán)依賴是 死鎖的一個(gè)必要條件,從而避免程序進(jìn)入死鎖狀態(tài)。

下面以一個(gè)關(guān)于線程死鎖的經(jīng)典問題:“哲學(xué)家就餐問題”,作為本節(jié)最后一個(gè)例子。題目是這樣的:五位哲學(xué)家圍坐在一張桌子前,每個(gè)人 面前有一碗飯和一只筷子。在這里每個(gè)哲學(xué)家可以看做是一個(gè)獨(dú)立的線程,而每只筷子可以看做是一個(gè)鎖。每個(gè)哲學(xué)家可以處在靜坐、 思考、吃飯三種狀態(tài)中的一個(gè)。需要注意的是,每個(gè)哲學(xué)家吃飯是需要兩只筷子的,這樣問題就來了:如果每個(gè)哲學(xué)家都拿起自己左邊的筷子, 那么他們五個(gè)都只能拿著一只筷子坐在那兒,直到餓死。此時(shí)他們就進(jìn)入了死鎖狀態(tài)。 下面是一個(gè)簡單的使用死鎖避免機(jī)制解決“哲學(xué)家就餐問題”的實(shí)現(xiàn):

最后,要特別注意到,為了避免死鎖,所有的加鎖操作必須使用 acquire()函數(shù)。如果代碼中的某部分繞過 acquire 函數(shù)直接申請鎖,那么整個(gè)死鎖避免機(jī)制就不起作用了。

保存線程的狀態(tài)信息

問題

你需要保存正在運(yùn)行線程的狀態(tài),這個(gè)狀態(tài)對(duì)于其他的線程是不可見的。

解決方案

有時(shí)在多線程編程中,你需要只保存當(dāng)前運(yùn)行線程的狀態(tài)。 要這么做,可使用 thread.local() 創(chuàng)建一個(gè)本地線程存儲(chǔ)對(duì)象。 對(duì)這個(gè)對(duì)象的屬性的保存和讀取操作都只會(huì)對(duì)執(zhí)行線程可見,而其他線程并不可見。

作為使用本地存儲(chǔ)的一個(gè)有趣的實(shí)際例子, 考慮在8.3小節(jié)定義過的 LazyConnection 上下文管理器類。 下面我們對(duì)它進(jìn)行一些小的修改使得它可以適用于多線程:

from socket import socket, AF_INET, SOCK_STREAM
import threading

class LazyConnection:
    def __init__(self, address, family=AF_INET, type=SOCK_STREAM):
        self.address = address
        self.family = AF_INET
        self.type = SOCK_STREAM
        self.local = threading.local()

    def __enter__(self):
        if hasattr(self.local, 'sock'):
            raise RuntimeError('Already connected')
        self.local.sock = socket(self.family, self.type)
        self.local.sock.connect(self.address)
        return self.local.sock

    def __exit__(self, exc_ty, exc_val, tb):
        self.local.sock.close()
        del self.local.sock

代碼中,自己觀察對(duì)于self.local屬性的使用。 它被初始化尾一個(gè) threading.local()實(shí)例。 其他方法操作被存儲(chǔ)為 self.local.sock的套接字對(duì)象。 有了這些就可以在多線程中安全的使用 LazyConnection實(shí)例了。例如:

from functools import partial
def test(conn):
    with conn as s:
        s.send(b'GET /index.html HTTP/1.0\r\n')
        s.send(b'Host: www.python.org\r\n')

        s.send(b'\r\n')
        resp = b''.join(iter(partial(s.recv, 8192), b''))

    print('Got {} bytes'.format(len(resp)))

if __name__ == '__main__':
    conn = LazyConnection(('www.python.org', 80))

    t1 = threading.Thread(target=test, args=(conn,))
    t2 = threading.Thread(target=test, args=(conn,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

它之所以行得通的原因是每個(gè)線程會(huì)創(chuàng)建一個(gè)自己專屬的套接字連接(存儲(chǔ)為 self.local.sock)。 因此,當(dāng)不同的線程執(zhí)行套接字操作時(shí),由于操作的是不同的套接字,因此它們不會(huì)相互影響。

討論

在大部分程序中創(chuàng)建和操作線程特定狀態(tài)并不會(huì)有什么問題。 不過,當(dāng)出了問題的時(shí)候,通常是因?yàn)槟硞€(gè)對(duì)象被多個(gè)線程使用到,用來操作一些專用的系統(tǒng)資源, 比如一個(gè)套接字或文件。你不能讓所有線程貢獻(xiàn)一個(gè)單獨(dú)對(duì)象, 因?yàn)槎鄠€(gè)線程同時(shí)讀和寫的時(shí)候會(huì)產(chǎn)生混亂。 本地線程存儲(chǔ)通過讓這些資源只能在被使用的線程中可見來解決這個(gè)問題。

本節(jié)中,使用 thread.local()可以讓 LazyConnection類支持一個(gè)線程一個(gè)連接, 而不是對(duì)于所有的進(jìn)程都只有一個(gè)連接。

其原理是,每個(gè) threading.local() 實(shí)例為每個(gè)線程維護(hù)著一個(gè)單獨(dú)的實(shí)例字典。 所有普通實(shí)例操作比如獲取、修改和刪除值僅僅操作這個(gè)字典。 每個(gè)線程使用一個(gè)獨(dú)立的字典就可以保證數(shù)據(jù)的隔離了。

創(chuàng)建一個(gè)線程池

問題

你創(chuàng)建一個(gè)工作者線程池,用來相應(yīng)客戶端請求或執(zhí)行其他的工作。

解決方案

concurrent.futures 函數(shù)庫有一個(gè)ThreadPoolExecutor 類可以被用來完成這個(gè)任務(wù)。 下面是一個(gè)簡單的 TCP 服務(wù)器,使用了一個(gè)線程池來響應(yīng)客戶端:

from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor

def echo_client(sock, client_addr):
    '''
    Handle a client connection
    '''
    print('Got connection from', client_addr)
    while True:
        msg = sock.recv(65536)
        if not msg:
            break
        sock.sendall(msg)
    print('Client closed connection')
    sock.close()

def echo_server(addr):
    pool = ThreadPoolExecutor(128)
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(addr)
    sock.listen(5)
    while True:
        client_sock, client_addr = sock.accept()
        pool.submit(echo_client, client_sock, client_addr)

echo_server(('',15000))

如果你想手動(dòng)創(chuàng)建你自己的線程池, 通??梢允褂靡粋€(gè) Queue 來輕松實(shí)現(xiàn)。下面是一個(gè)稍微不同但是手動(dòng)實(shí)現(xiàn)的例子:

from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
from queue import Queue

def echo_client(q):
    '''
    Handle a client connection
    '''
    sock, client_addr = q.get()
    print('Got connection from', client_addr)
    while True:
        msg = sock.recv(65536)
        if not msg:
            break
        sock.sendall(msg)
    print('Client closed connection')

    sock.close()

def echo_server(addr, nworkers):
    # Launch the client workers
    q = Queue()
    for n in range(nworkers):
        t = Thread(target=echo_client, args=(q,))
        t.daemon = True
        t.start()

    # Run the server
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(addr)
    sock.listen(5)
    while True:
        client_sock, client_addr = sock.accept()
        q.put((client_sock, client_addr))

echo_server(('',15000), 128)

使用 ThreadPoolExecutor 相對(duì)于手動(dòng)實(shí)現(xiàn)的一個(gè)好處在于它使得 任務(wù)提交者更方便的從被調(diào)用函數(shù)中獲取返回值。例如,你可能會(huì)像下面這樣寫:

from concurrent.futures import ThreadPoolExecutor
import urllib.request

def fetch_url(url):
    u = urllib.request.urlopen(url)
    data = u.read()
    return data

pool = ThreadPoolExecutor(10)
# Submit work to the pool
a = pool.submit(fetch_url, 'http://www.python.org')
b = pool.submit(fetch_url, 'http://www.pypy.org')

# Get the results back
x = a.result()
y = b.result()

例子中返回的 handle 對(duì)象會(huì)幫你處理所有的阻塞與協(xié)作,然后從工作線程中返回?cái)?shù)據(jù)給你。 特別的,a.result() 操作會(huì)阻塞進(jìn)程直到對(duì)應(yīng)的函數(shù)執(zhí)行完成并返回一個(gè)結(jié)果。

討論

通常來講,你應(yīng)該避免編寫線程數(shù)量可以無限制增長的程序。例如,看看下面這個(gè)服務(wù)器:

from threading import Thread
from socket import socket, AF_INET, SOCK_STREAM

def echo_client(sock, client_addr):
    '''
    Handle a client connection
    '''
    print('Got connection from', client_addr)
    while True:
        msg = sock.recv(65536)
        if not msg:
            break
        sock.sendall(msg)
    print('Client closed connection')
    sock.close()

def echo_server(addr, nworkers):
    # Run the server
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(addr)
    sock.listen(5)
    while True:
        client_sock, client_addr = sock.accept()
        t = Thread(target=echo_client, args=(client_sock, client_addr))
        t.daemon = True
        t.start()

echo_server(('',15000))

盡管這個(gè)也可以工作, 但是它不能抵御有人試圖通過創(chuàng)建大量線程讓你服務(wù)器資源枯竭而崩潰的攻擊行為。 通過使用預(yù)先初始化的線程池,你可以設(shè)置同時(shí)運(yùn)行線程的上限數(shù)量。

你可能會(huì)關(guān)心創(chuàng)建大量線程會(huì)有什么后果。 現(xiàn)代操作系統(tǒng)可以很輕松的創(chuàng)建幾千個(gè)線程的線程池。 甚至,同時(shí)幾千個(gè)線程等待工作并不會(huì)對(duì)其他代碼產(chǎn)生性能影響。 當(dāng)然了,如果所有線程同時(shí)被喚醒并立即在 CPU 上執(zhí)行,那就不同了——特別是有了全局解釋器鎖 GIL。 通常,你應(yīng)該只在 I/O 處理相關(guān)代碼中使用線程池。

創(chuàng)建大的線程池的一個(gè)可能需要關(guān)注的問題是內(nèi)存的使用。 例如,如果你在 OS X 系統(tǒng)上面創(chuàng)建2000個(gè)線程,系統(tǒng)顯示 Python 進(jìn)程使用了超過 9 GB 的虛擬內(nèi)存。 不過,這個(gè)計(jì)算通常是有誤差的。當(dāng)創(chuàng)建一個(gè)線程時(shí),操作系統(tǒng)會(huì)預(yù)留一個(gè)虛擬內(nèi)存區(qū)域來 放置線程的執(zhí)行棧(通常是 8 MB 大小)。但是這個(gè)內(nèi)存只有一小片段被實(shí)際映射到真實(shí)內(nèi)存中。 因此,Python 進(jìn)程使用到的真實(shí)內(nèi)存其實(shí)很小 (比如,對(duì)于2000個(gè)線程來講,只使用到了 70 MB 的真實(shí)內(nèi)存,而不是 9 GB)。 如果你擔(dān)心虛擬內(nèi)存大小,可以使用 threading.stack_size() 函數(shù)來降低它。例如:

import threading
threading.stack_size(65536)

如果你加上這條語句并再次運(yùn)行前面的創(chuàng)建2000個(gè)線程試驗(yàn), 你會(huì)發(fā)現(xiàn) Python 進(jìn)程只使用到了大概 210 MB 的虛擬內(nèi)存,而真實(shí)內(nèi)存使用量沒有變。 注意線程棧大小必須至少為32768字節(jié),通常是系統(tǒng)內(nèi)存頁大?。?096、8192等)的整數(shù)倍。

簡單的并行編程

問題

你有個(gè)程序要執(zhí)行 CPU 密集型工作,你想讓他利用多核 CPU 的優(yōu)勢來運(yùn)行的快一點(diǎn)。

解決方案

concurrent.futures庫提供了一個(gè) ProcessPoolExecutor 類, 可被用來在一個(gè)單獨(dú)的 Python 解釋器中執(zhí)行計(jì)算密集型函數(shù)。 不過,要使用它,你首先要有一些計(jì)算密集型的任務(wù)。 我們通過一個(gè)簡單而實(shí)際的例子來演示它。假定你有個(gè) Apache web 服務(wù)器日志目錄的 gzip 壓縮包:

logs/
   20120701.log.gz
   20120702.log.gz
   20120703.log.gz
   20120704.log.gz
   20120705.log.gz
   20120706.log.gz
   ...

進(jìn)一步假設(shè)每個(gè)日志文件內(nèi)容類似下面這樣:

124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
...

下面是一個(gè)腳本,在這些日志文件中查找出所有訪問過 robots.txt 文件的主機(jī):

# findrobots.py

import gzip
import io
import glob

def find_robots(filename):
    '''
    Find all of the hosts that access robots.txt in a single log file
    '''
    robots = set()
    with gzip.open(filename) as f:
        for line in io.TextIOWrapper(f,encoding='ascii'):
            fields = line.split()
            if fields[6] == '/robots.txt':
                robots.add(fields[0])
    return robots

def find_all_robots(logdir):
    '''
    Find all hosts across and entire sequence of files
    '''
    files = glob.glob(logdir+'/*.log.gz')
    all_robots = set()
    for robots in map(find_robots, files):
        all_robots.update(robots)
    return all_robots

if __name__ == '__main__':
    robots = find_all_robots('logs')
    for ipaddr in robots:
        print(ipaddr)

前面的程序使用了通常的 map-reduce 風(fēng)格來編寫。 函數(shù) find_robots()在一個(gè)文件名集合上做 map 操作,并將結(jié)果匯總為一個(gè)單獨(dú)的結(jié)果, 也就是 find_all_robots() 函數(shù)中的 all_robots 集合。 現(xiàn)在,假設(shè)你想要修改這個(gè)程序讓它使用多核 CPU。 很簡單——只需要將 map()操作替換為一個(gè) concurrent.futures 庫中生成的類似操作即可。 下面是一個(gè)簡單修改版本:

# findrobots.py

import gzip
import io
import glob
from concurrent import futures

def find_robots(filename):
    '''
    Find all of the hosts that access robots.txt in a single log file

    '''
    robots = set()
    with gzip.open(filename) as f:
        for line in io.TextIOWrapper(f,encoding='ascii'):
            fields = line.split()
            if fields[6] == '/robots.txt':
                robots.add(fields[0])
    return robots

def find_all_robots(logdir):
    '''
    Find all hosts across and entire sequence of files
    '''
    files = glob.glob(logdir+'/*.log.gz')
    all_robots = set()
    with futures.ProcessPoolExecutor() as pool:
        for robots in pool.map(find_robots, files):
            all_robots.update(robots)
    return all_robots

if __name__ == '__main__':
    robots = find_all_robots('logs')
    for ipaddr in robots:
        print(ipaddr)

通過這個(gè)修改后,運(yùn)行這個(gè)腳本產(chǎn)生同樣的結(jié)果,但是在四核機(jī)器上面比之前快了3.5倍。 實(shí)際的性能優(yōu)化效果根據(jù)你的機(jī)器 CPU 數(shù)量的不同而不同。

討論

ProcessPoolExecutor 的典型用法如下:

from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as pool:
    ...
    do work in parallel using pool
    ...

其原理是,一個(gè) ProcessPoolExecutor 創(chuàng)建 N 個(gè)獨(dú)立的 Python 解釋器, N 是系統(tǒng)上面可用 CPU 的個(gè)數(shù)。你可以通過提供可選參數(shù)給ProcessPoolExecutor(N) 來修改 處理器數(shù)量。這個(gè)處理池會(huì)一直運(yùn)行到 with 塊中最后一個(gè)語句執(zhí)行完成, 然后處理池被關(guān)閉。不過,程序會(huì)一直等待直到所有提交的工作被處理完成。

被提交到池中的工作必須被定義為一個(gè)函數(shù)。有兩種方法去提交。 如果你想讓一個(gè)列表推導(dǎo)或一個(gè) map() 操作并行執(zhí)行的話,可使用pool.map() :

# A function that performs a lot of work
def work(x):
    ...
    return result

# Nonparallel code
results = map(work, data)

# Parallel implementation
with ProcessPoolExecutor() as pool:
    results = pool.map(work, data)

另外,你可以使用pool.submit() 來手動(dòng)的提交單個(gè)任務(wù):

# Some function
def work(x):
    ...
    return result

with ProcessPoolExecutor() as pool:
    ...
    # Example of submitting work to the pool
    future_result = pool.submit(work, arg)

    # Obtaining the result (blocks until done)
    r = future_result.result()
    ...

如果你手動(dòng)提交一個(gè)任務(wù),結(jié)果是一個(gè)Future 實(shí)例。 要獲取最終結(jié)果,你需要調(diào)用它的 result()方法。 它會(huì)阻塞進(jìn)程直到結(jié)果被返回來。

如果不想阻塞,你還可以使用一個(gè)回調(diào)函數(shù),例如:

def when_done(r):
    print('Got:', r.result())

with ProcessPoolExecutor() as pool:
     future_result = pool.submit(work, arg)
     future_result.add_done_callback(when_done)

回調(diào)函數(shù)接受一個(gè) Future 實(shí)例,被用來獲取最終的結(jié)果(比如通過調(diào)用它的 result()方法)。 盡管處理池很容易使用,在設(shè)計(jì)大程序的時(shí)候還是有很多需要注意的地方,如下幾點(diǎn):

  • 這種并行處理技術(shù)只適用于那些可以被分解為互相獨(dú)立部分的問題。
  • 被提交的任務(wù)必須是簡單函數(shù)形式。對(duì)于方法、閉包和其他類型的并行執(zhí)行還不支持。
  • 函數(shù)參數(shù)和返回值必須兼容 pickle,因?yàn)橐褂玫竭M(jìn)程間的通信,所有解釋器之間的交換數(shù)據(jù)必須被序列化
  • 被提交的任務(wù)函數(shù)不應(yīng)保留狀態(tài)或有副作用。除了打印日志之類簡單的事情, 一旦啟動(dòng)你不能控制子進(jìn)程的任何行為,因此最好保持簡單和純潔——函數(shù)不要去修改環(huán)境。

  • 在 Unix 上進(jìn)程池通過調(diào)用 fork()系統(tǒng)調(diào)用被創(chuàng)建, 它會(huì)克隆 Python 解釋器,包括 fork 時(shí)的所有程序狀態(tài)。 而在 Windows 上,克隆解釋器時(shí)不會(huì)克隆狀態(tài)。 實(shí)際的 fork 操作會(huì)在第一次調(diào)用pool.map()pool.submit()后發(fā)生。

  • 當(dāng)你混合使用進(jìn)程池和多線程的時(shí)候要特別小心。 你應(yīng)該在創(chuàng)建任何線程之前先創(chuàng)建并激活進(jìn)程池(比如在程序啟動(dòng)的 main 線程中創(chuàng)建進(jìn)程池)。

Python 的全局鎖問題

問題

你已經(jīng)聽說過全局解釋器鎖 GIL,擔(dān)心它會(huì)影響到多線程程序的執(zhí)行性能。

解決方案

盡管 Python 完全支持多線程編程, 但是解釋器的 C 語言實(shí)現(xiàn)部分在完全并行執(zhí)行時(shí)并不是線程安全的。 實(shí)際上,解釋器被一個(gè)全局解釋器鎖保護(hù)著,它確保任何時(shí)候都只有一個(gè) Python 線程執(zhí)行。 GIL 最大的問題就是 Python 的多線程程序并不能利用多核 CPU 的優(yōu)勢 (比如一個(gè)使用了多個(gè)線程的計(jì)算密集型程序只會(huì)在一個(gè)單 CPU 上面運(yùn)行)。

在討論普通的 GIL 之前,有一點(diǎn)要強(qiáng)調(diào)的是 GIL 只會(huì)影響到那些嚴(yán)重依賴 CPU 的程序(比如計(jì)算型的)。 如果你的程序大部分只會(huì)設(shè)計(jì)到 I/O,比如網(wǎng)絡(luò)交互,那么使用多線程就很合適, 因?yàn)樗鼈兇蟛糠謺r(shí)間都在等待。實(shí)際上,你完全可以放心的創(chuàng)建幾千個(gè) Python 線程, 現(xiàn)代操作系統(tǒng)運(yùn)行這么多線程沒有任何壓力,沒啥可擔(dān)心的。

而對(duì)于依賴 CPU 的程序,你需要弄清楚執(zhí)行的計(jì)算的特點(diǎn)。 例如,優(yōu)化底層算法要比使用多線程運(yùn)行快得多。 類似的,由于 Python 是解釋執(zhí)行的,如果你將那些性能瓶頸代碼移到一個(gè) C 語言擴(kuò)展模塊中, 速度也會(huì)提升的很快。如果你要操作數(shù)組,那么使用 NumPy 這樣的擴(kuò)展會(huì)非常的高效。 最后,你還可以考慮下其他可選實(shí)現(xiàn)方案,比如 PyPy,它通過一個(gè) JIT 編譯器來優(yōu)化執(zhí)行效率 (不過在寫這本書的時(shí)候它還不能支持 Python 3)。

還有一點(diǎn)要注意的是,線程不是專門用來優(yōu)化性能的。 一個(gè) CPU 依賴型程序可能會(huì)使用線程來管理一個(gè)圖形用戶界面、一個(gè)網(wǎng)絡(luò)連接或其他服務(wù)。 這時(shí)候,GIL 會(huì)產(chǎn)生一些問題,因?yàn)槿绻粋€(gè)線程長期持有 GIL 的話會(huì)導(dǎo)致其他非 CPU 型線程一直等待。 事實(shí)上,一個(gè)寫的不好的 C 語言擴(kuò)展會(huì)導(dǎo)致這個(gè)問題更加嚴(yán)重, 盡管代碼的計(jì)算部分會(huì)比之前運(yùn)行的更快些。

說了這么多,現(xiàn)在想說的是我們有兩種策略來解決 GIL 的缺點(diǎn)。 首先,如果你完全工作于 Python 環(huán)境中,你可以使用 multiprocessing模塊來創(chuàng)建一個(gè)進(jìn)程池, 并像協(xié)同處理器一樣的使用它。例如,加入你有如下的線程代碼:

# Performs a large calculation (CPU bound)
def some_work(args):
    ...
    return result

# A thread that calls the above function
def some_thread():
    while True:
        ...
        r = some_work(args)
    ...

修改代碼,使用進(jìn)程池:

# Processing pool (see below for initiazation)
pool = None

# Performs a large calculation (CPU bound)
def some_work(args):
    ...
    return result

# A thread that calls the above function
def some_thread():
    while True:
        ...
        r = pool.apply(some_work, (args))
        ...

# Initiaze the pool
if __name__ == '__main__':
    import multiprocessing
    pool = multiprocessing.Pool()

這個(gè)通過使用一個(gè)技巧利用進(jìn)程池解決了 GIL 的問題。 當(dāng)一個(gè)線程想要執(zhí)行 CPU 密集型工作時(shí),會(huì)將任務(wù)發(fā)給進(jìn)程池。 然后進(jìn)程池會(huì)在另外一個(gè)進(jìn)程中啟動(dòng)一個(gè)單獨(dú)的 Python 解釋器來工作。 當(dāng)線程等待結(jié)果的時(shí)候會(huì)釋放 GIL。 并且,由于計(jì)算任務(wù)在單獨(dú)解釋器中執(zhí)行,那么就不會(huì)受限于 GIL 了。 在一個(gè)多核系統(tǒng)上面,你會(huì)發(fā)現(xiàn)這個(gè)技術(shù)可以讓你很好的利用多 CPU 的優(yōu)勢。

另外一個(gè)解決 GIL 的策略是使用 C 擴(kuò)展編程技術(shù)。 主要思想是將計(jì)算密集型任務(wù)轉(zhuǎn)移給 C,跟 Python 獨(dú)立,在工作的時(shí)候在 C 代碼中釋放 GIL。 這可以通過在 C 代碼中插入下面這樣的特殊宏來完成:

#include "Python.h"
...

PyObject *pyfunc(PyObject *self, PyObject *args) {
   ...
   Py_BEGIN_ALLOW_THREADS
   // Threaded C code
   ...
   Py_END_ALLOW_THREADS
   ...
}

如果你使用其他工具訪問 C 語言,比如對(duì)于 Cython 的 ctypes 庫,你不需要做任何事。 例如,ctypes 在調(diào)用 C 時(shí)會(huì)自動(dòng)釋放 GIL。

討論

許多程序員在面對(duì)線程性能問題的時(shí)候,馬上就會(huì)怪罪 GIL,什么都是它的問題。 其實(shí)這樣子太不厚道也太天真了點(diǎn)。 作為一個(gè)真實(shí)的例子,在多線程的網(wǎng)絡(luò)編程中神秘的 stalls可能是因?yàn)槠渌虮热缫粋€(gè) DNS 查找延時(shí),而跟 GIL 毫無關(guān)系。 最后你真的需要先去搞懂你的代碼是否真的被 GIL 影響到。 同時(shí)還要明白 GIL 大部分都應(yīng)該只關(guān)注 CPU 的處理而不是 I/O.

如果你準(zhǔn)備使用一個(gè)處理器池,注意的是這樣做涉及到數(shù)據(jù)序列化和在不同 Python 解釋器通信。 被執(zhí)行的操作需要放在一個(gè)通過 def 語句定義的 Python 函數(shù)中,不能是 lambda、閉包可調(diào)用實(shí)例等, 并且函數(shù)參數(shù)和返回值必須要兼容 pickle。 同樣,要執(zhí)行的任務(wù)量必須足夠大以彌補(bǔ)額外的通宵開銷。

另外一個(gè)難點(diǎn)是當(dāng)混合使用線程和進(jìn)程池的時(shí)候會(huì)讓你很頭疼。 如果你要同時(shí)使用兩者,最好在程序啟動(dòng)時(shí),創(chuàng)建任何線程之前先創(chuàng)建一個(gè)單例的進(jìn)程池。 然后線程使用同樣的進(jìn)程池來進(jìn)行它們的計(jì)算密集型工作。

C 擴(kuò)展最重要的特征是它們和 Python 解釋器是保持獨(dú)立的。 也就是說,如果你準(zhǔn)備將 Python 中的任務(wù)分配到 C 中去執(zhí)行, 你需要確保 C 代碼的操作跟 Python 保持獨(dú)立, 這就意味著不要使用 Python 數(shù)據(jù)結(jié)構(gòu)以及不要調(diào)用 Python 的 C API。 另外一個(gè)就是你要確保 C 擴(kuò)展所做的工作是足夠的,值得你這樣做。 也就是說 C 擴(kuò)展擔(dān)負(fù)起了大量的計(jì)算任務(wù),而不是少數(shù)幾個(gè)計(jì)算。

這些解決 GIL 的方案并不能適用于所有問題。 例如,某些類型的應(yīng)用程序如果被分解為多個(gè)進(jìn)程處理的話并不能很好的工作, 也不能將它的部分代碼改成C語言執(zhí)行。 對(duì)于這些應(yīng)用程序,你就要自己需求解決方案了 (比如多進(jìn)程訪問共享內(nèi)存區(qū),多解析器運(yùn)行于同一個(gè)進(jìn)程等)。 或者,你還可以考慮下其他的解釋器實(shí)現(xiàn),比如 PyPy。

了解更多關(guān)于在 C 擴(kuò)展中釋放 GIL,請參考15.7和15.10小節(jié)。

定義一個(gè) Actor 任務(wù)

問題

你想定義跟 actor 模式中類似“actors”角色的任務(wù)

解決方案

actore 模式是一種最古老的也是最簡單的并行和分布式計(jì)算解決方案。 事實(shí)上,它天生的簡單性是它如此受歡迎的重要原因之一。 簡單來講,一個(gè) actor 就是一個(gè)并發(fā)執(zhí)行的任務(wù),只是簡單的執(zhí)行發(fā)送給它的消息任務(wù)。 響應(yīng)這些消息時(shí),它可能還會(huì)給其他 actor 發(fā)送更進(jìn)一步的消息。 actor 之間的通信是單向和異步的。因此,消息發(fā)送者不知道消息是什么時(shí)候被發(fā)送, 也