鍍金池/ 教程/ Python/ 網(wǎng)絡(luò)與 Web 編程
類與對象
模塊與包
數(shù)據(jù)編碼和處理
元編程
網(wǎng)絡(luò)與 Web 編程
數(shù)字日期和時(shí)間
測試、調(diào)試和異常
字符串和文本
文件與 IO
腳本編程與系統(tǒng)管理
迭代器與生成器
函數(shù)
C 語言擴(kuò)展
并發(fā)編程
數(shù)據(jù)結(jié)構(gòu)和算法

網(wǎng)絡(luò)與 Web 編程

本章是關(guān)于在網(wǎng)絡(luò)應(yīng)用和分布式應(yīng)用中使用的各種主題。主題劃分為使用 Python 編寫客戶端程序來訪問已有的服務(wù),以及使用 Python 實(shí)現(xiàn)網(wǎng)絡(luò)服務(wù)端程序。也給出了一些常見的技術(shù),用于編寫涉及協(xié)同或通信的的代碼。

作為客戶端與 HTTP 服務(wù)交互

問題

你需要通過 HTTP 協(xié)議以客戶端的方式訪問多種服務(wù)。例如,下載數(shù)據(jù)或者與基于 REST 的 API 進(jìn)行交互。

解決方案

對于簡單的事情來說,通常使用 urllib.request 模塊就夠了。例如,發(fā)送一個(gè)簡單的 HTTP GET 請求到遠(yuǎn)程的服務(wù)上,可以這樣做:

from urllib import request, parse

# Base URL being accessed
url = 'http://httpbin.org/get'

# Dictionary of query parameters (if any)
parms = {
   'name1' : 'value1',
   'name2' : 'value2'
}

# Encode the query string
querystring = parse.urlencode(parms)

# Make a GET request and read the response
u = request.urlopen(url+'?' + querystring)
resp = u.read()

如果你需要使用 POST 方法在請求主體中發(fā)送查詢參數(shù),可以將參數(shù)編碼后作為可選參數(shù)提供給 urlopen() 函數(shù),就像這樣:

from urllib import request, parse

# Base URL being accessed
url = 'http://httpbin.org/post'

# Dictionary of query parameters (if any)
parms = {
   'name1' : 'value1',
   'name2' : 'value2'
}

# Encode the query string
querystring = parse.urlencode(parms)

# Make a POST request and read the response
u = request.urlopen(url, querystring.encode('ascii'))
resp = u.read()

如果你需要在發(fā)出的請求中提供一些自定義的 HTTP 頭,例如修改 user-agent 字段,可以創(chuàng)建一個(gè)包含字段值的字典,并創(chuàng)建一個(gè) Request 實(shí)例然后將其傳給 urlopen() ,如下:

from urllib import request, parse
...

# Extra headers
headers = {
    'User-agent' : 'none/ofyourbusiness',
    'Spam' : 'Eggs'
}

req = request.Request(url, querystring.encode('ascii'), headers=headers)

# Make a request and read the response
u = request.urlopen(req)
resp = u.read()

如果需要交互的服務(wù)比上面的例子都要復(fù)雜,也許應(yīng)該去看看 requests 庫(https://pypi.python.org/pypi/requests)。例如,下面這個(gè)示例采用 requests 庫重新實(shí)現(xiàn)了上面的操作:

import requests

# Base URL being accessed
url = 'http://httpbin.org/post'

# Dictionary of query parameters (if any)
parms = {
   'name1' : 'value1',
   'name2' : 'value2'
}

# Extra headers
headers = {
    'User-agent' : 'none/ofyourbusiness',
    'Spam' : 'Eggs'
}

resp = requests.post(url, data=parms, headers=headers)

# Decoded text returned by the request
text = resp.text

關(guān)于 requests 庫,一個(gè)值得一提的特性就是它能以多種方式從請求中返回響應(yīng)結(jié)果的內(nèi)容。從上面的代碼來看, resp.text 帶給我們的是以 Unicode 解碼的響應(yīng)文本。但是,如果去訪問 resp.content ,就會得到原始的二進(jìn)制數(shù)據(jù)。另一方面,如果訪問 resp.json ,那么就會得到 JSON 格式的響應(yīng)內(nèi)容。

下面這個(gè)示例利用 requests 庫發(fā)起一個(gè) HEAD 請求,并從響應(yīng)中提取出一些 HTTP 頭數(shù)據(jù)的字段:

import requests

resp = requests.head('http://www.python.org/index.html')

status = resp.status_code
last_modified = resp.headers['last-modified']
content_type = resp.headers['content-type']
content_length = resp.headers['content-length']

Here is a requests example that executes a login into the Python Package index using
basic authentication:
import requests

resp = requests.get('http://pypi.python.org/pypi?:action=login',
                    auth=('user','password'))

Here is an example of using requests to pass HTTP cookies from one request to the
next:

import requests

# First request
resp1 = requests.get(url)
...

# Second requests with cookies received on first requests
resp2 = requests.get(url, cookies=resp1.cookies)

Last, but not least, here is an example of using requests to upload content:

import requests
url = 'http://httpbin.org/post'
files = { 'file': ('data.csv', open('data.csv', 'rb')) }

r = requests.post(url, files=files)

討論

對于真的很簡單 HTTP 客戶端代碼,用內(nèi)置的 urllib 模塊通常就足夠了。但是,如果你要做的不僅僅只是簡單的 GET 或 POST 請求,那就真的不能再依賴它的功能了。這時(shí)候就是第三方模塊比如 requests大顯身手的時(shí)候了。

例如,如果你決定堅(jiān)持使用標(biāo)準(zhǔn)的程序庫而不考慮像 requests 這樣的第三方庫,那么也許就不得不使用底層的 http.client 模塊來實(shí)現(xiàn)自己的代碼。比方說,下面的代碼展示了如何執(zhí)行一個(gè) HEAD 請求:

from http.client import HTTPConnection
from urllib import parse

c = HTTPConnection('www.python.org', 80)
c.request('HEAD', '/index.html')
resp = c.getresponse()

print('Status', resp.status)
for name, value in resp.getheaders():
    print(name, value)

同樣地,如果必須編寫涉及代理、認(rèn)證、cookies 以及其他一些細(xì)節(jié)方面的代碼,那么使用 urllib就顯得特別別扭和啰嗦。比方說,下面這個(gè)示例實(shí)現(xiàn)在 Python 包索引上的認(rèn)證:

import urllib.request

auth = urllib.request.HTTPBasicAuthHandler()
auth.add_password('pypi','http://pypi.python.org','username','password')
opener = urllib.request.build_opener(auth)

r = urllib.request.Request('http://pypi.python.org/pypi?:action=login')
u = opener.open(r)
resp = u.read()

# From here. You can access more pages using opener
...

坦白說,所有的這些操作在 requests庫中都變得簡單的多。

在開發(fā)過程中測試 HTTP 客戶端代碼常常是很令人沮喪的,因?yàn)樗屑值募?xì)節(jié)問題都需要考慮(例如 cookies、認(rèn)證、HTTP 頭、編碼方式等)。要完成這些任務(wù),考慮使用 httpbin 服務(wù)(http://httpbin.org)。這個(gè)站點(diǎn)會接收發(fā)出的請求,然后以 JSON 的形式將相應(yīng)信息回傳回來。下面是一個(gè)交互式的例子:

>>> import requests
>>> r = requests.get('http://httpbin.org/get?name=Dave&n=37',
...     headers = { 'User-agent': 'goaway/1.0' })
>>> resp = r.json
>>> resp['headers']
{'User-Agent': 'goaway/1.0', 'Content-Length': '', 'Content-Type': '',
'Accept-Encoding': 'gzip, deflate, compress', 'Connection':
'keep-alive', 'Host': 'httpbin.org', 'Accept': '*/*'}
>>> resp['args']
{'name': 'Dave', 'n': '37'}
>>>

在要同一個(gè)真正的站點(diǎn)進(jìn)行交互前,先在 httpbin.org 這樣的網(wǎng)站上做實(shí)驗(yàn)常常是可取的辦法。尤其是當(dāng)我們面對3次登錄失敗就會關(guān)閉賬戶這樣的風(fēng)險(xiǎn)時(shí)尤為有用(不要嘗試自己編寫 HTTP 認(rèn)證客戶端來登錄你的銀行賬戶)。

盡管本節(jié)沒有涉及, request 庫還對許多高級的 HTTP 客戶端協(xié)議提供了支持,比如 OAuth。 requests 模塊的文檔(http://docs.python-requests.org)質(zhì)量很高(坦白說比在這短短的一節(jié)的篇幅中所提供的任何信息都好),可以參考文檔以獲得更多地信息。

創(chuàng)建 TCP 服務(wù)器

問題

你想實(shí)現(xiàn)一個(gè)服務(wù)器,通過 TCP 協(xié)議和客戶端通信。

解決方案

創(chuàng)建一個(gè) TCP 服務(wù)器的一個(gè)簡單方法是使用 socketserver 庫。例如,下面是一個(gè)簡單的應(yīng)答服務(wù)器:

from socketserver import BaseRequestHandler, TCPServer

class EchoHandler(BaseRequestHandler):
    def handle(self):
        print('Got connection from', self.client_address)
        while True:

            msg = self.request.recv(8192)
            if not msg:
                break
            self.request.send(msg)

if __name__ == '__main__':
    serv = TCPServer(('', 20000), EchoHandler)
    serv.serve_forever()

在這段代碼中,你定義了一個(gè)特殊的處理類,實(shí)現(xiàn)了一個(gè) handle() 方法,用來為客戶端連接服務(wù)。request屬性是客戶端 socket,client_address 有客戶端地址。 為了測試這個(gè)服務(wù)器,運(yùn)行它并打開另外一個(gè) Python 進(jìn)程連接這個(gè)服務(wù)器:

>>> from socket import socket, AF_INET, SOCK_STREAM
>>> s = socket(AF_INET, SOCK_STREAM)
>>> s.connect(('localhost', 20000))
>>> s.send(b'Hello')
5
>>> s.recv(8192)
b'Hello'
>>>

很多時(shí)候,可以很容易的定義一個(gè)不同的處理器。下面是一個(gè)使用 StreamRequestHandler基類將一個(gè)類文件接口放置在底層 socket 上的例子:

from socketserver import StreamRequestHandler, TCPServer

class EchoHandler(StreamRequestHandler):
    def handle(self):
        print('Got connection from', self.client_address)
        # self.rfile is a file-like object for reading
        for line in self.rfile:
            # self.wfile is a file-like object for writing
            self.wfile.write(line)

if __name__ == '__main__':
    serv = TCPServer(('', 20000), EchoHandler)
    serv.serve_forever()

討論

socketserver 可以讓我們很容易的創(chuàng)建簡單的 TCP 服務(wù)器。 但是,你需要注意的是,默認(rèn)情況下這種服務(wù)器是單線程的,一次只能為一個(gè)客戶端連接服務(wù)。 如果你想處理多個(gè)客戶端,可以初始化一個(gè) ForkingTCPServer 或者是 ThreadingTCPServer 對象。例如:

from socketserver import ThreadingTCPServer

if __name__ == '__main__':
    serv = ThreadingTCPServer(('', 20000), EchoHandler)
    serv.serve_forever()

使用 fork 或線程服務(wù)器有個(gè)潛在問題就是它們會為每個(gè)客戶端連接創(chuàng)建一個(gè)新的進(jìn)程或線程。 由于客戶端連接數(shù)是沒有限制的,因此一個(gè)惡意的黑客可以同時(shí)發(fā)送大量的連接讓你的服務(wù)器奔潰。

如果你擔(dān)心這個(gè)問題,你可以創(chuàng)建一個(gè)預(yù)先分配大小的工作線程池或進(jìn)程池。 你先創(chuàng)建一個(gè)普通的非線程服務(wù)器,然后在一個(gè)線程池中使用 serve_forever() 方法來啟動它們。

if __name__ == '__main__':
    from threading import Thread
    NWORKERS = 16
    serv = TCPServer(('', 20000), EchoHandler)
    for n in range(NWORKERS):
        t = Thread(target=serv.serve_forever)
        t.daemon = True
        t.start()
    serv.serve_forever()

一般來講,一個(gè) TCPServer 在實(shí)例化的時(shí)候會綁定并激活相應(yīng)的 socket。 不過,有時(shí)候你想通過設(shè)置某些選項(xiàng)去調(diào)整底下的 socket ,可以設(shè)置參數(shù) bind_and_activate=False。如下:

if __name__ == '__main__':
    serv = TCPServer(('', 20000), EchoHandler, bind_and_activate=False)
    # Set up various socket options
    serv.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
    # Bind and activate
    serv.server_bind()
    serv.server_activate()
    serv.serve_forever()

上面的socket選項(xiàng)是一個(gè)非常普遍的配置項(xiàng),它允許服務(wù)器重新綁定一個(gè)之前使用過的端口號。 由于要被經(jīng)常使用到,它被放置到類變量中,可以直接在 TCPServer 上面設(shè)置。 在實(shí)例化服務(wù)器的時(shí)候去設(shè)置它的值,如下所示:

if __name__ == '__main__':
    TCPServer.allow_reuse_address = True
    serv = TCPServer(('', 20000), EchoHandler)
    serv.serve_forever()

在上面示例中,我們演示了兩種不同的處理器基類( BaseRequestHandlerStreamRequestHandler )。 StreamRequestHandler更加靈活點(diǎn),能通過設(shè)置其他的類變量來支持一些新的特性。比如:

import socket

class EchoHandler(StreamRequestHandler):
    # Optional settings (defaults shown)
    timeout = 5                      # Timeout on all socket operations
    rbufsize = -1                    # Read buffer size
    wbufsize = 0                     # Write buffer size
    disable_nagle_algorithm = False  # Sets TCP_NODELAY socket option
    def handle(self):
        print('Got connection from', self.client_address)
        try:
            for line in self.rfile:
                # self.wfile is a file-like object for writing
                self.wfile.write(line)
        except socket.timeout:
            print('Timed out!')

最后,還需要注意的是巨大部分 Python 的高層網(wǎng)絡(luò)模塊(比如 HTTP、XML-RPC 等)都是建立在 socketserver 功能之上。 也就是說,直接使用 socket 庫來實(shí)現(xiàn)服務(wù)器也并不是很難。 下面是一個(gè)使用 socket直接編程實(shí)現(xiàn)的一個(gè)服務(wù)器簡單例子:

from socket import socket, AF_INET, SOCK_STREAM

def echo_handler(address, client_sock):
    print('Got connection from {}'.format(address))
    while True:
        msg = client_sock.recv(8192)
        if not msg:
            break
        client_sock.sendall(msg)
    client_sock.close()

def echo_server(address, backlog=5):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(address)
    sock.listen(backlog)
    while True:
        client_sock, client_addr = sock.accept()
        echo_handler(client_addr, client_sock)

if __name__ == '__main__':
    echo_server(('', 20000))

創(chuàng)建 UDP 服務(wù)器

問題

你想實(shí)現(xiàn)一個(gè)基于 UDP 協(xié)議的服務(wù)器來與客戶端通信。

解決方案

跟 TCP 一樣,UDP 服務(wù)器也可以通過使用 socketserver 庫很容易的被創(chuàng)建。 例如,下面是一個(gè)簡單的時(shí)間服務(wù)器:

from socketserver import BaseRequestHandler, UDPServer
import time

class TimeHandler(BaseRequestHandler):
    def handle(self):
        print('Got connection from', self.client_address)
        # Get message and client socket
        msg, sock = self.request
        resp = time.ctime()
        sock.sendto(resp.encode('ascii'), self.client_address)

if __name__ == '__main__':
    serv = UDPServer(('', 20000), TimeHandler)
    serv.serve_forever()

跟之前一樣,你先定義一個(gè)實(shí)現(xiàn) handle()特殊方法的類,為客戶端連接服務(wù)。 這個(gè)類的 request 屬性是一個(gè)包含了數(shù)據(jù)報(bào)和底層 socket 對象的元組。client_address包含了客戶端地址。

我們來測試下這個(gè)服務(wù)器,首先運(yùn)行它,然后打開另外一個(gè) Python 進(jìn)程向服務(wù)器發(fā)送消息:

>>> from socket import socket, AF_INET, SOCK_DGRAM
>>> s = socket(AF_INET, SOCK_DGRAM)
>>> s.sendto(b'', ('localhost', 20000))
0
>>> s.recvfrom(8192)
(b'Wed Aug 15 20:35:08 2012', ('127.0.0.1', 20000))
>>>

討論

一個(gè)典型的 UPD 服務(wù)器接收到達(dá)的數(shù)據(jù)報(bào)(消息)和客戶端地址。如果服務(wù)器需要做應(yīng)答, 它要給客戶端回發(fā)一個(gè)數(shù)據(jù)報(bào)。對于數(shù)據(jù)報(bào)的傳送, 你應(yīng)該使用 socket 的 sendto()recvfrom()方法。 盡管傳統(tǒng)的 send()recv()也可以達(dá)到同樣的效果, 但是前面的兩個(gè)方法對于 UDP 連接而言更普遍。

由于沒有底層的連接,UPD 服務(wù)器相對于 TCP 服務(wù)器來講實(shí)現(xiàn)起來更加簡單。 不過,UDP 天生是不可靠的(因?yàn)橥ㄐ艣]有建立連接,消息可能丟失)。 因此需要由你自己來決定該怎樣處理丟失消息的情況。這個(gè)已經(jīng)不在本書討論范圍內(nèi)了, 不過通常來說,如果可靠性對于你程序很重要,你需要借助于序列號、重試、超時(shí)以及一些其他方法來保證。 UDP 通常被用在那些對于可靠傳輸要求不是很高的場合。例如,在實(shí)時(shí)應(yīng)用如多媒體流以及游戲領(lǐng)域, 無需返回恢復(fù)丟失的數(shù)據(jù)包(程序只需簡單的忽略它并繼續(xù)向前運(yùn)行)。

UDPServer類是單線程的,也就是說一次只能為一個(gè)客戶端連接服務(wù)。 實(shí)際使用中,這個(gè)無論是對于 UDP 還是 TCP 都不是什么大問題。 如果你想要并發(fā)操作,可以實(shí)例化一個(gè) ForkingUDPServerThreadingUDPServer 對象:

from socketserver import ThreadingUDPServer

   if __name__ == '__main__':
    serv = ThreadingUDPServer(('',20000), TimeHandler)
    serv.serve_forever()

直接使用 socket 來是想一個(gè) UDP 服務(wù)器也不難,下面是一個(gè)例子:

from socket import socket, AF_INET, SOCK_DGRAM
import time

def time_server(address):
    sock = socket(AF_INET, SOCK_DGRAM)
    sock.bind(address)
    while True:
        msg, addr = sock.recvfrom(8192)
        print('Got message from', addr)
        resp = time.ctime()
        sock.sendto(resp.encode('ascii'), addr)

if __name__ == '__main__':
    time_server(('', 20000))

通過 CIDR 地址生成對應(yīng)的 IP 地址集

問題

你有一個(gè) CIDR 網(wǎng)絡(luò)地址比如“123.45.67.89/27”,你想將其轉(zhuǎn)換成它所代表的所有 IP (比如,“123.45.67.64”, “123.45.67.65”, …, “123.45.67.95”))

解決方案

可以使用 ipaddress 模塊很容易的實(shí)現(xiàn)這樣的計(jì)算。例如:

>>> import ipaddress
>>> net = ipaddress.ip_network('123.45.67.64/27')
>>> net
IPv4Network('123.45.67.64/27')
>>> for a in net:
...     print(a)
...
123.45.67.64
123.45.67.65
123.45.67.66
123.45.67.67
123.45.67.68
...
123.45.67.95
>>>

>>> net6 = ipaddress.ip_network('12:3456:78:90ab:cd:ef01:23:30/125')
>>> net6
IPv6Network('12:3456:78:90ab:cd:ef01:23:30/125')
>>> for a in net6:
...     print(a)
...
12:3456:78:90ab:cd:ef01:23:30
12:3456:78:90ab:cd:ef01:23:31
12:3456:78:90ab:cd:ef01:23:32
12:3456:78:90ab:cd:ef01:23:33
12:3456:78:90ab:cd:ef01:23:34
12:3456:78:90ab:cd:ef01:23:35
12:3456:78:90ab:cd:ef01:23:36
12:3456:78:90ab:cd:ef01:23:37
>>>

Network也允許像數(shù)組一樣的索引取值,例如:

>>> net.num_addresses
32
>>> net[0]

IPv4Address('123.45.67.64')
>>> net[1]
IPv4Address('123.45.67.65')
>>> net[-1]
IPv4Address('123.45.67.95')
>>> net[-2]
IPv4Address('123.45.67.94')
>>>

另外,你還可以執(zhí)行網(wǎng)絡(luò)成員檢查之類的操作:

>>> a = ipaddress.ip_address('123.45.67.69')
>>> a in net
True
>>> b = ipaddress.ip_address('123.45.67.123')
>>> b in net
False
>>>

一個(gè) IP 地址和網(wǎng)絡(luò)地址能通過一個(gè) IP 接口來指定,例如:

>>> inet = ipaddress.ip_interface('123.45.67.73/27')
>>> inet.network
IPv4Network('123.45.67.64/27')
>>> inet.ip
IPv4Address('123.45.67.73')
>>>

討論

ipaddress 模塊有很多類可以表示 IP 地址、網(wǎng)絡(luò)和接口。 當(dāng)你需要操作網(wǎng)絡(luò)地址(比如解析、打印、驗(yàn)證等)的時(shí)候會很有用。

要注意的是,ipaddress模塊跟其他一些和網(wǎng)絡(luò)相關(guān)的模塊比如 socket庫交集很少。 所以,你不能使用 IPv4Address 的實(shí)例來代替一個(gè)地址字符串,你首先得顯式的使用str()轉(zhuǎn)換它。例如:

>>> a = ipaddress.ip_address('127.0.0.1')
>>> from socket import socket, AF_INET, SOCK_STREAM
>>> s = socket(AF_INET, SOCK_STREAM)
>>> s.connect((a, 8080))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: Can't convert 'IPv4Address' object to str implicitly
>>> s.connect((str(a), 8080))
>>>

更多相關(guān)內(nèi)容,請參考 An Introduction to the ipaddress Module

創(chuàng)建一個(gè)簡單的 REST 接口

問題

你想使用一個(gè)簡單的 REST 接口通過網(wǎng)絡(luò)遠(yuǎn)程控制或訪問你的應(yīng)用程序,但是你又不想自己去安裝一個(gè)完整的 web 框架。

解決方案

構(gòu)建一個(gè) REST 風(fēng)格的接口最簡單的方法是創(chuàng)建一個(gè)基于 WSGI 標(biāo)準(zhǔn)(PEP 3333)的很小的庫,下面是一個(gè)例子:

# resty.py

import cgi

def notfound_404(environ, start_response):
    start_response('404 Not Found', [ ('Content-type', 'text/plain') ])
    return [b'Not Found']

class PathDispatcher:
    def __init__(self):
        self.pathmap = { }

    def __call__(self, environ, start_response):
        path = environ['PATH_INFO']
        params = cgi.FieldStorage(environ['wsgi.input'],
                                  environ=environ)
        method = environ['REQUEST_METHOD'].lower()
        environ['params'] = { key: params.getvalue(key) for key in params }
        handler = self.pathmap.get((method,path), notfound_404)
        return handler(environ, start_response)

    def register(self, method, path, function):
        self.pathmap[method.lower(), path] = function
        return function

為了使用這個(gè)調(diào)度器,你只需要編寫不同的處理器,就像下面這樣:

import time

_hello_resp = '''\
<html>
  <head>
     <title>Hello {name}</title>
   </head>
   <body>
     <h1>Hello {name}!</h1>
   </body>
</html>'''

def hello_world(environ, start_response):
    start_response('200 OK', [ ('Content-type','text/html')])
    params = environ['params']
    resp = _hello_resp.format(name=params.get('name'))
    yield resp.encode('utf-8')

_localtime_resp = '''\
<?xml version="1.0"?>
<time>
  <year>{t.tm_year}</year>
  <month>{t.tm_mon}</month>
  <day>{t.tm_mday}</day>
  <hour>{t.tm_hour}</hour>
  <minute>{t.tm_min}</minute>
  <second>{t.tm_sec}</second>
</time>'''

def localtime(environ, start_response):
    start_response('200 OK', [ ('Content-type', 'application/xml') ])
    resp = _localtime_resp.format(t=time.localtime())
    yield resp.encode('utf-8')

if __name__ == '__main__':
    from resty import PathDispatcher
    from wsgiref.simple_server import make_server

    # Create the dispatcher and register functions
    dispatcher = PathDispatcher()
    dispatcher.register('GET', '/hello', hello_world)
    dispatcher.register('GET', '/localtime', localtime)

    # Launch a basic server
    httpd = make_server('', 8080, dispatcher)
    print('Serving on port 8080...')
    httpd.serve_forever()

要測試下這個(gè)服務(wù)器,你可以使用一個(gè)瀏覽器或 urllib和它交互。例如:

>>> u = urlopen('http://localhost:8080/hello?name=Guido')
>>> print(u.read().decode('utf-8'))
<html>
  <head>
     <title>Hello Guido</title>
   </head>
   <body>
     <h1>Hello Guido!</h1>
   </body>
</html>

>>> u = urlopen('http://localhost:8080/localtime')
>>> print(u.read().decode('utf-8'))
<?xml version="1.0"?>
<time>
  <year>2012</year>
  <month>11</month>
  <day>24</day>
  <hour>14</hour>
  <minute>49</minute>
  <second>17</second>
</time>
>>>

討論

在編寫 REST 接口時(shí),通常都是服務(wù)于普通的 HTTP 請求。但是跟那些功能完整的網(wǎng)站相比,你通常只需要處理數(shù)據(jù)。 這些數(shù)據(jù)以各種標(biāo)準(zhǔn)格式編碼,比如 XML、JSON 或 CSV。 盡管程序看上去很簡單,但是以這種方式提供的 API 對于很多應(yīng)用程序來講是非常有用的。

例如,長期運(yùn)行的程序可能會使用一個(gè) REST API 來實(shí)現(xiàn)監(jiān)控或診斷。 大數(shù)據(jù)應(yīng)用程序可以使用 REST 來構(gòu)建一個(gè)數(shù)據(jù)查詢或提取系統(tǒng)。 REST 還能用來控制硬件設(shè)備比如機(jī)器人、傳感器、工廠或燈泡。 更重要的是,REST API 已經(jīng)被大量客戶端編程環(huán)境所支持,比如 Javascript, Android, iOS 等。 因此,利用這種接口可以讓你開發(fā)出更加復(fù)雜的應(yīng)用程序。

為了實(shí)現(xiàn)一個(gè)簡單的 REST 接口,你只需讓你的程序代碼滿足 Python 的 WSGI 標(biāo)準(zhǔn)即可。 WSGI 被標(biāo)準(zhǔn)庫支持,同時(shí)也被絕大部分第三方 web 框架支持。 因此,如果你的代碼遵循這個(gè)標(biāo)準(zhǔn),在后面的使用過程中就會更加的靈活!

在 WSGI 中,你可以像下面這樣約定的方式以一個(gè)可調(diào)用對象形式來實(shí)現(xiàn)你的程序。

import cgi

def wsgi_app(environ, start_response):
    pass

environ屬性是一個(gè)字典,包含了從 web 服務(wù)器如 Apache [參考 Internet RFC 3875]提供的 CGI 接口中獲取的值。 要將這些不同的值提取出來,你可以像這么這樣寫:

def wsgi_app(environ, start_response):
    method = environ['REQUEST_METHOD']
    path = environ['PATH_INFO']
    # Parse the query parameters
    params = cgi.FieldStorage(environ['wsgi.input'], environ=environ)

我們展示了一些常見的值。environ['REQUEST_METHOD']代表請求類型如 GET、POST、HEAD 等。 environ['PATH_INFO']表示被請求資源的路徑。 調(diào)用 cgi.FieldStorage()可以從請求中提取查詢參數(shù)并將它們放入一個(gè)類字典對象中以便后面使用。

start_response參數(shù)是一個(gè)為了初始化一個(gè)請求對象而必須被調(diào)用的函數(shù)。 第一個(gè)參數(shù)是返回的 HTTP 狀態(tài)值,第二個(gè)參數(shù)是一個(gè)(名,值)元組列表,用來構(gòu)建返回的 HTTP 頭。例如:

def wsgi_app(environ, start_response):
    pass
    start_response('200 OK', [('Content-type', 'text/plain')])

為了返回?cái)?shù)據(jù),一個(gè) WSGI 程序必須返回一個(gè)字節(jié)字符串序列??梢韵裣旅孢@樣使用一個(gè)列表來完成:

def wsgi_app(environ, start_response):
    pass
    start_response('200 OK', [('Content-type', 'text/plain')])
    resp = []
    resp.append(b'Hello World\n')
    resp.append(b'Goodbye!\n')
    return resp

或者,你還可以使用yield

def wsgi_app(environ, start_response):
    pass
    start_response('200 OK', [('Content-type', 'text/plain')])
    yield b'Hello World\n'
    yield b'Goodbye!\n'

這里要強(qiáng)調(diào)的一點(diǎn)是最后返回的必須是字節(jié)字符串。如果返回結(jié)果包含文本字符串,必須先將其編碼成字節(jié)。 當(dāng)然,并沒有要求你返回的一點(diǎn)是文本,你可以很輕松的編寫一個(gè)生成圖片的程序。

盡管 WSGI 程序通常被定義成一個(gè)函數(shù),不過你也可以使用類實(shí)例來實(shí)現(xiàn),只要它實(shí)現(xiàn)了合適的 __call__() 方法。例如:

class WSGIApplication:
    def __init__(self):
        ...
    def __call__(self, environ, start_response)
       ...

我們已經(jīng)在上面使用這種技術(shù)創(chuàng)建 PathDispatcher 類。 這個(gè)分發(fā)器僅僅只是管理一個(gè)字典,將(方法,路徑)對映射到處理器函數(shù)上面。 當(dāng)一個(gè)請求到來時(shí),它的方法和路徑被提取出來,然后被分發(fā)到對應(yīng)的處理器上面去。 另外,任何查詢變量會被解析后放到一個(gè)字典中,以 environ['params'] 形式存儲。 后面這個(gè)步驟太常見,所以建議你在分發(fā)器里面完成,這樣可以省掉很多重復(fù)代碼。 使用分發(fā)器的時(shí)候,你只需簡單的創(chuàng)建一個(gè)實(shí)例,然后通過它注冊各種 WSGI 形式的函數(shù)。 編寫這些函數(shù)應(yīng)該超級簡單了,只要你遵循 start_response() 函數(shù)的編寫規(guī)則,并且最后返回字節(jié)字符串即可。

當(dāng)編寫這種函數(shù)的時(shí)候還需注意的一點(diǎn)就是對于字符串模板的使用。 沒人愿意寫那種到處混合著 print() 函數(shù) 、XML 和大量格式化操作的代碼。 我們上面使用了三引號包含的預(yù)先定義好的字符串模板。 這種方式的可以讓我們很容易的在以后修改輸出格式(只需要修改模板本身,而不用動任何使用它的地方)。

最后,使用 WSGI 還有一個(gè)很重要的部分就是沒有什么地方是針對特定 web 服務(wù)器的。 因?yàn)闃?biāo)準(zhǔn)對于服務(wù)器和框架是中立的,你可以將你的程序放入任何類型服務(wù)器中。 我們使用下面的代碼測試測試本節(jié)代碼:

if __name__ == '__main__':
    from wsgiref.simple_server import make_server

    # Create the dispatcher and register functions
    dispatcher = PathDispatcher()
    pass

    # Launch a basic server
    httpd = make_server('', 8080, dispatcher)
    print('Serving on port 8080...')
    httpd.serve_forever()

上面代碼創(chuàng)建了一個(gè)簡單的服務(wù)器,然后你就可以來測試下你的實(shí)現(xiàn)是否能正常工作。 最后,當(dāng)你準(zhǔn)備進(jìn)一步擴(kuò)展你的程序的時(shí)候,你可以修改這個(gè)代碼,讓它可以為特定服務(wù)器工作。

WSGI 本身是一個(gè)很小的標(biāo)準(zhǔn)。因此它并沒有提供一些高級的特性比如認(rèn)證、cookies、重定向等。 這些你自己實(shí)現(xiàn)起來也不難。不過如果你想要更多的支持,可以考慮第三方庫,比如WebOb 或者Paste

通過 XML-RPC 實(shí)現(xiàn)簡單的遠(yuǎn)程調(diào)用

問題

你想找到一個(gè)簡單的方式去執(zhí)行運(yùn)行在遠(yuǎn)程機(jī)器上面的 Python 程序中的函數(shù)或方法。

解決方案

實(shí)現(xiàn)一個(gè)遠(yuǎn)程方法調(diào)用的最簡單方式是使用 XML-RPC。下面我們演示一下一個(gè)實(shí)現(xiàn)了鍵-值存儲功能的簡單服務(wù)器:

from xmlrpc.server import SimpleXMLRPCServer

class KeyValueServer:
    _rpc_methods_ = ['get', 'set', 'delete', 'exists', 'keys']
    def __init__(self, address):
        self._data = {}
        self._serv = SimpleXMLRPCServer(address, allow_none=True)
        for name in self._rpc_methods_:
            self._serv.register_function(getattr(self, name))

    def get(self, name):
        return self._data[name]

    def set(self, name, value):
        self._data[name] = value

    def delete(self, name):
        del self._data[name]

    def exists(self, name):
        return name in self._data

    def keys(self):
        return list(self._data)

    def serve_forever(self):
        self._serv.serve_forever()

# Example
if __name__ == '__main__':
    kvserv = KeyValueServer(('', 15000))
    kvserv.serve_forever()

下面我們從一個(gè)客戶端機(jī)器上面來訪問服務(wù)器:

>>> from xmlrpc.client import ServerProxy
>>> s = ServerProxy('http://localhost:15000', allow_none=True)
>>> s.set('foo', 'bar')
>>> s.set('spam', [1, 2, 3])
>>> s.keys()
['spam', 'foo']
>>> s.get('foo')
'bar'
>>> s.get('spam')
[1, 2, 3]
>>> s.delete('spam')
>>> s.exists('spam')
False
>>>

討論

XML-RPC 可以讓我們很容易的構(gòu)造一個(gè)簡單的遠(yuǎn)程調(diào)用服務(wù)。你所需要做的僅僅是創(chuàng)建一個(gè)服務(wù)器實(shí)例, 通過它的方法 register_function()來注冊函數(shù),然后使用方法 serve_forever()啟動它。 在上面我們將這些步驟放在一起寫到一個(gè)類中,不夠這并不是必須的。比如你還可以像下面這樣創(chuàng)建一個(gè)服務(wù)器:

from xmlrpc.server import SimpleXMLRPCServer
def add(x,y):
    return x+y

serv = SimpleXMLRPCServer(('', 15000))
serv.register_function(add)
serv.serve_forever()

XML-RPC 暴露出來的函數(shù)只能適用于部分?jǐn)?shù)據(jù)類型,比如字符串、整形、列表和字典。 對于其他類型就得需要做些額外的功課了。 例如,如果你想通過 XML-RPC 傳遞一個(gè)對象實(shí)例,實(shí)際上只有他的實(shí)例字典被處理:

>>> class Point:
...     def __init__(self, x, y):
...             self.x = x
...             self.y = y
...
>>> p = Point(2, 3)
>>> s.set('foo', p)
>>> s.get('foo')
{'x': 2, 'y': 3}
>>>

類似的,對于二進(jìn)制數(shù)據(jù)的處理也跟你想象的不太一樣:

>>> s.set('foo', b'Hello World')
>>> s.get('foo')
<xmlrpc.client.Binary object at 0x10131d410>

>>> _.data
b'Hello World'
>>>

一般來講,你不應(yīng)該將 XML-RPC 服務(wù)以公共 API 的方式暴露出來。 對于這種情況,通常分布式應(yīng)用程序會是一個(gè)更好的選擇。

XML-RPC 的一個(gè)缺點(diǎn)是它的性能。SimpleXMLRPCServer 的實(shí)現(xiàn)是單線程的, 所以它不適合于大型程序,盡管我們在11.2小節(jié)中演示過它是可以通過多線程來執(zhí)行的。 另外,由于 XML-RPC 將所有數(shù)據(jù)都序列化為 XML 格式,所以它會比其他的方式運(yùn)行的慢一些。 但是它也有優(yōu)點(diǎn),這種方式的編碼可以被絕大部分其他編程語言支持。 通過使用這種方式,其他語言的客戶端程序都能訪問你的服務(wù)。

雖然 XML-RPC 有很多缺點(diǎn),但是如果你需要快速構(gòu)建一個(gè)簡單遠(yuǎn)程過程調(diào)用系統(tǒng)的話,它仍然值得去學(xué)習(xí)的。 有時(shí)候,簡單的方案就已經(jīng)足夠了。

在不同的 Python 解釋器之間交互

問題

你在不同的機(jī)器上面運(yùn)行著多個(gè) Python 解釋器實(shí)例,并希望能夠在這些解釋器之間通過消息來交換數(shù)據(jù)。

解決方案

通過使用 multiprocessing.connection模塊可以很容易的實(shí)現(xiàn)解釋器之間的通信。 下面是一個(gè)簡單的應(yīng)答服務(wù)器例子:

from multiprocessing.connection import Listener
import traceback

def echo_client(conn):
    try:
        while True:
            msg = conn.recv()
            conn.send(msg)
    except EOFError:
        print('Connection closed')

def echo_server(address, authkey):
    serv = Listener(address, authkey=authkey)
    while True:
        try:
            client = serv.accept()

            echo_client(client)
        except Exception:
            traceback.print_exc()

echo_server(('', 25000), authkey=b'peekaboo')

然后客戶端連接服務(wù)器并發(fā)送消息的簡單示例:

>>> from multiprocessing.connection import Client
>>> c = Client(('localhost', 25000), authkey=b'peekaboo')
>>> c.send('hello')
>>> c.recv()
'hello'
>>> c.send(42)
>>> c.recv()
42
>>> c.send([1, 2, 3, 4, 5])
>>> c.recv()
[1, 2, 3, 4, 5]
>>>

跟底層 socket 不同的是,每個(gè)消息會完整保存(每一個(gè)通過 send()發(fā)送的對象能通過 recv()來完整接受)。 另外,所有對象會通過 pickle 序列化。因此,任何兼容 pickle 的對象都能在此連接上面被發(fā)送和接受。

討論

目前有很多用來實(shí)現(xiàn)各種消息傳輸?shù)陌秃瘮?shù)庫,比如 ZeroMQ、Celery 等。 你還有另外一種選擇就是自己在底層 socket 基礎(chǔ)之上來實(shí)現(xiàn)一個(gè)消息傳輸層。 但是你想要簡單一點(diǎn)的方案,那么這時(shí)候 multiprocessing.connection 就派上用場了。 僅僅使用一些簡單的語句即可實(shí)現(xiàn)多個(gè)解釋器之間的消息通信。

如果你的解釋器運(yùn)行在同一臺機(jī)器上面,那么你可以使用另外的通信機(jī)制,比如 Unix 域套接字或者是 Windows 命名管道。 要想使用 UNIX 域套接字來創(chuàng)建一個(gè)連接,只需簡單的將地址改寫一個(gè)文件名即可:

s = Listener('/tmp/myconn', authkey=b'peekaboo')

要想使用 Windows 命名管道來創(chuàng)建連接,只需像下面這樣使用一個(gè)文件名:

s = Listener(r'\\.\pipe\myconn', authkey=b'peekaboo')

一個(gè)通用準(zhǔn)則是,你不要使用 multiprocessing 來實(shí)現(xiàn)一個(gè)對外的公共服務(wù)。 Client()Listener() 中的 authkey 參數(shù)用來認(rèn)證發(fā)起連接的終端用戶。 如果密鑰不對會產(chǎn)生一個(gè)異常。此外,該模塊最適合用來建立長連接(而不是大量的短連接), 例如,兩個(gè)解釋器之間啟動后就開始建立連接并在處理某個(gè)問題過程中會一直保持連接狀態(tài)。

如果你需要對底層連接做更多的控制,比如需要支持超時(shí)、非阻塞 I/O 或其他類似的特性, 你最好使用另外的庫或者是在高層 socket 上來實(shí)現(xiàn)這些特性。

實(shí)現(xiàn)遠(yuǎn)程方法調(diào)用

問題

你想在一個(gè)消息傳輸層如 sockets 、multiprocessing connectionsZeroMQ的基礎(chǔ)之上實(shí)現(xiàn)一個(gè)簡單的遠(yuǎn)程過程調(diào)用(RPC)。

解決方案

將函數(shù)請求、參數(shù)和返回值使用 pickle 編碼后,在不同的解釋器直接傳送 pickle 字節(jié)字符串,可以很容易的實(shí)現(xiàn) RPC。 下面是一個(gè)簡單的 PRC 處理器,可以被整合到一個(gè)服務(wù)器中去:

# rpcserver.py

import pickle
class RPCHandler:
    def __init__(self):
        self._functions = { }

    def register_function(self, func):
        self._functions[func.__name__] = func

    def handle_connection(self, connection):
        try:
            while True:
                # Receive a message
                func_name, args, kwargs = pickle.loads(connection.recv())
                # Run the RPC and send a response
                try:
                    r = self._functions[func_name](*args,**kwargs)
                    connection.send(pickle.dumps(r))
                except Exception as e:
                    connection.send(pickle.dumps(e))
        except EOFError:
             pass

要使用這個(gè)處理器,你需要將它加入到一個(gè)消息服務(wù)器中。你有很多種選擇, 但是使用 multiprocessing 庫是最簡單的。下面是一個(gè) RPC 服務(wù)器例子:

from multiprocessing.connection import Listener
from threading import Thread

def rpc_server(handler, address, authkey):
    sock = Listener(address, authkey=authkey)
    while True:
        client = sock.accept()
        t = Thread(target=handler.handle_connection, args=(client,))
        t.daemon = True
        t.start()

# Some remote functions
def add(x, y):
    return x + y

def sub(x, y):
    return x - y

# Register with a handler
handler = RPCHandler()
handler.register_function(add)
handler.register_function(sub)

# Run the server
rpc_server(handler, ('localhost', 17000), authkey=b'peekaboo')

為了從一個(gè)遠(yuǎn)程客戶端訪問服務(wù)器,你需要?jiǎng)?chuàng)建一個(gè)對應(yīng)的用來傳送請求的 RPC 代理類。例如

import pickle

class RPCProxy:
    def __init__(self, connection):
        self._connection = connection
    def __getattr__(self, name):
        def do_rpc(*args, **kwargs):
            self._connection.send(pickle.dumps((name, args, kwargs)))
            result = pickle.loads(self._connection.recv())
            if isinstance(result, Exception):
                raise result
            return result
        return do_rpc

要使用這個(gè)代理類,你需要將其包裝到一個(gè)服務(wù)器的連接上面,例如:

>>> from multiprocessing.connection import Client
>>> c = Client(('localhost', 17000), authkey=b'peekaboo')
>>> proxy = RPCProxy(c)
>>> proxy.add(2, 3)

5
>>> proxy.sub(2, 3)
-1
>>> proxy.sub([1, 2], 4)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "rpcserver.py", line 37, in do_rpc
    raise result
TypeError: unsupported operand type(s) for -: 'list' and 'int'
>>>

要注意的是很多消息層(比如 multiprocessing )已經(jīng)使用 pickle 序列化了數(shù)據(jù)。 如果是這樣的話,對 pickle.dumps()pickle.loads() 的調(diào)用要去掉。

討論

RPCHandlerRPCProxy 的基本思路是很比較簡單的。 如果一個(gè)客戶端想要調(diào)用一個(gè)遠(yuǎn)程函數(shù),比如 foo(1, 2, z=3) ,代理類創(chuàng)建一個(gè)包含了函數(shù)名和參數(shù)的元組 ('foo', (1, 2), {'z': 3})。 這個(gè)元組被 pickle 序列化后通過網(wǎng)絡(luò)連接發(fā)生出去。 這一步在 RPCProxy__getattr__()方法返回的 do_rpc() 閉包中完成。 服務(wù)器接收后通過 pickle 反序列化消息,查找函數(shù)名看看是否已經(jīng)注冊過,然后執(zhí)行相應(yīng)的函數(shù)。 執(zhí)行結(jié)果(或異常)被pickle序列化后返回發(fā)送給客戶端。我們的實(shí)例需要依賴 multiprocessing進(jìn)行通信。 不過,這種方式可以適用于其他任何消息系統(tǒng)。例如,如果你想在 ZeroMQ 之上實(shí)習(xí) RPC, 僅僅只需要將連接對象換成合適的 ZeroMQ 的 socket 對象即可。

由于底層需要依賴 pickle,那么安全問題就需要考慮了 (因?yàn)橐粋€(gè)聰明的黑客可以創(chuàng)建特定的消息,能夠讓任意函數(shù)通過 pickle 反序列化后被執(zhí)行)。 因此你永遠(yuǎn)不要允許來自不信任或未認(rèn)證的客戶端的 RPC。特別是你絕對不要允許來自 Internet 的任意機(jī)器的訪問, 這種只能在內(nèi)部被使用,位于防火墻后面并且不要對外暴露。

作為 pickle 的替代,你也許可以考慮使用 JSON、XML 或一些其他的編碼格式來序列化消息。 例如,本機(jī)實(shí)例可以很容易的改寫成 JSON 編碼方案。還需要將 pickle.loads()pickle.dumps()替換成 json.loads()json.dumps() 即可:

# jsonrpcserver.py
import json

class RPCHandler:
    def __init__(self):
        self._functions = { }

    def register_function(self, func):
        self._functions[func.__name__] = func

    def handle_connection(self, connection):
        try:
            while True:
                # Receive a message
                func_name, args, kwargs = json.loads(connection.recv())
                # Run the RPC and send a response
                try:
                    r = self._functions[func_name](*args,**kwargs)
                    connection.send(json.dumps(r))
                except Exception as e:
                    connection.send(json.dumps(str(e)))
        except EOFError:
             pass

# jsonrpcclient.py
import json

class RPCProxy:
    def __init__(self, connection):
        self._connection = connection
    def __getattr__(self, name):
        def do_rpc(*args, **kwargs):
            self._connection.send(json.dumps((name, args, kwargs)))
            result = json.loads(self._connection.recv())
            return result
        return do_rpc

實(shí)現(xiàn) RPC 的一個(gè)比較復(fù)雜的問題是如何去處理異常。至少,當(dāng)方法產(chǎn)生異常時(shí)服務(wù)器不應(yīng)該奔潰。 因此,返回給客戶端的異常所代表的含義就要好好設(shè)計(jì)了。 如果你使用 pickle,異常對象實(shí)例在客戶端能被反序列化并拋出。如果你使用其他的協(xié)議,那得想想另外的方法了。 不過至少,你應(yīng)該在響應(yīng)中返回異常字符串。我們在 JSON 的例子中就是使用的這種方式。

對于其他的 RPC 實(shí)現(xiàn)例子,我推薦你看看在 XML-RPC 中使用的 SimpleXMLRPCServerServerProxy 的實(shí)現(xiàn), 也就是11.6小節(jié)中的內(nèi)容。

簡單的客