本章是關(guān)于在網(wǎng)絡(luò)應(yīng)用和分布式應(yīng)用中使用的各種主題。主題劃分為使用 Python 編寫客戶端程序來訪問已有的服務(wù),以及使用 Python 實(shí)現(xiàn)網(wǎng)絡(luò)服務(wù)端程序。也給出了一些常見的技術(shù),用于編寫涉及協(xié)同或通信的的代碼。
你需要通過 HTTP 協(xié)議以客戶端的方式訪問多種服務(wù)。例如,下載數(shù)據(jù)或者與基于 REST 的 API 進(jìn)行交互。
對于簡單的事情來說,通常使用 urllib.request
模塊就夠了。例如,發(fā)送一個(gè)簡單的 HTTP GET 請求到遠(yuǎn)程的服務(wù)上,可以這樣做:
from urllib import request, parse
# Base URL being accessed
url = 'http://httpbin.org/get'
# Dictionary of query parameters (if any)
parms = {
'name1' : 'value1',
'name2' : 'value2'
}
# Encode the query string
querystring = parse.urlencode(parms)
# Make a GET request and read the response
u = request.urlopen(url+'?' + querystring)
resp = u.read()
如果你需要使用 POST 方法在請求主體中發(fā)送查詢參數(shù),可以將參數(shù)編碼后作為可選參數(shù)提供給 urlopen()
函數(shù),就像這樣:
from urllib import request, parse
# Base URL being accessed
url = 'http://httpbin.org/post'
# Dictionary of query parameters (if any)
parms = {
'name1' : 'value1',
'name2' : 'value2'
}
# Encode the query string
querystring = parse.urlencode(parms)
# Make a POST request and read the response
u = request.urlopen(url, querystring.encode('ascii'))
resp = u.read()
如果你需要在發(fā)出的請求中提供一些自定義的 HTTP 頭,例如修改 user-agent
字段,可以創(chuàng)建一個(gè)包含字段值的字典,并創(chuàng)建一個(gè) Request 實(shí)例然后將其傳給 urlopen()
,如下:
from urllib import request, parse
...
# Extra headers
headers = {
'User-agent' : 'none/ofyourbusiness',
'Spam' : 'Eggs'
}
req = request.Request(url, querystring.encode('ascii'), headers=headers)
# Make a request and read the response
u = request.urlopen(req)
resp = u.read()
如果需要交互的服務(wù)比上面的例子都要復(fù)雜,也許應(yīng)該去看看 requests 庫(https://pypi.python.org/pypi/requests)。例如,下面這個(gè)示例采用 requests 庫重新實(shí)現(xiàn)了上面的操作:
import requests
# Base URL being accessed
url = 'http://httpbin.org/post'
# Dictionary of query parameters (if any)
parms = {
'name1' : 'value1',
'name2' : 'value2'
}
# Extra headers
headers = {
'User-agent' : 'none/ofyourbusiness',
'Spam' : 'Eggs'
}
resp = requests.post(url, data=parms, headers=headers)
# Decoded text returned by the request
text = resp.text
關(guān)于 requests 庫,一個(gè)值得一提的特性就是它能以多種方式從請求中返回響應(yīng)結(jié)果的內(nèi)容。從上面的代碼來看, resp.text
帶給我們的是以 Unicode 解碼的響應(yīng)文本。但是,如果去訪問 resp.content
,就會得到原始的二進(jìn)制數(shù)據(jù)。另一方面,如果訪問 resp.json
,那么就會得到 JSON 格式的響應(yīng)內(nèi)容。
下面這個(gè)示例利用 requests
庫發(fā)起一個(gè) HEAD 請求,并從響應(yīng)中提取出一些 HTTP 頭數(shù)據(jù)的字段:
import requests
resp = requests.head('http://www.python.org/index.html')
status = resp.status_code
last_modified = resp.headers['last-modified']
content_type = resp.headers['content-type']
content_length = resp.headers['content-length']
Here is a requests example that executes a login into the Python Package index using
basic authentication:
import requests
resp = requests.get('http://pypi.python.org/pypi?:action=login',
auth=('user','password'))
Here is an example of using requests to pass HTTP cookies from one request to the
next:
import requests
# First request
resp1 = requests.get(url)
...
# Second requests with cookies received on first requests
resp2 = requests.get(url, cookies=resp1.cookies)
Last, but not least, here is an example of using requests to upload content:
import requests
url = 'http://httpbin.org/post'
files = { 'file': ('data.csv', open('data.csv', 'rb')) }
r = requests.post(url, files=files)
對于真的很簡單 HTTP 客戶端代碼,用內(nèi)置的 urllib
模塊通常就足夠了。但是,如果你要做的不僅僅只是簡單的 GET 或 POST 請求,那就真的不能再依賴它的功能了。這時(shí)候就是第三方模塊比如 requests
大顯身手的時(shí)候了。
例如,如果你決定堅(jiān)持使用標(biāo)準(zhǔn)的程序庫而不考慮像 requests
這樣的第三方庫,那么也許就不得不使用底層的 http.client
模塊來實(shí)現(xiàn)自己的代碼。比方說,下面的代碼展示了如何執(zhí)行一個(gè) HEAD 請求:
from http.client import HTTPConnection
from urllib import parse
c = HTTPConnection('www.python.org', 80)
c.request('HEAD', '/index.html')
resp = c.getresponse()
print('Status', resp.status)
for name, value in resp.getheaders():
print(name, value)
同樣地,如果必須編寫涉及代理、認(rèn)證、cookies 以及其他一些細(xì)節(jié)方面的代碼,那么使用 urllib
就顯得特別別扭和啰嗦。比方說,下面這個(gè)示例實(shí)現(xiàn)在 Python 包索引上的認(rèn)證:
import urllib.request
auth = urllib.request.HTTPBasicAuthHandler()
auth.add_password('pypi','http://pypi.python.org','username','password')
opener = urllib.request.build_opener(auth)
r = urllib.request.Request('http://pypi.python.org/pypi?:action=login')
u = opener.open(r)
resp = u.read()
# From here. You can access more pages using opener
...
坦白說,所有的這些操作在 requests
庫中都變得簡單的多。
在開發(fā)過程中測試 HTTP 客戶端代碼常常是很令人沮喪的,因?yàn)樗屑值募?xì)節(jié)問題都需要考慮(例如 cookies、認(rèn)證、HTTP 頭、編碼方式等)。要完成這些任務(wù),考慮使用 httpbin 服務(wù)(http://httpbin.org)。這個(gè)站點(diǎn)會接收發(fā)出的請求,然后以 JSON 的形式將相應(yīng)信息回傳回來。下面是一個(gè)交互式的例子:
>>> import requests
>>> r = requests.get('http://httpbin.org/get?name=Dave&n=37',
... headers = { 'User-agent': 'goaway/1.0' })
>>> resp = r.json
>>> resp['headers']
{'User-Agent': 'goaway/1.0', 'Content-Length': '', 'Content-Type': '',
'Accept-Encoding': 'gzip, deflate, compress', 'Connection':
'keep-alive', 'Host': 'httpbin.org', 'Accept': '*/*'}
>>> resp['args']
{'name': 'Dave', 'n': '37'}
>>>
在要同一個(gè)真正的站點(diǎn)進(jìn)行交互前,先在 httpbin.org 這樣的網(wǎng)站上做實(shí)驗(yàn)常常是可取的辦法。尤其是當(dāng)我們面對3次登錄失敗就會關(guān)閉賬戶這樣的風(fēng)險(xiǎn)時(shí)尤為有用(不要嘗試自己編寫 HTTP 認(rèn)證客戶端來登錄你的銀行賬戶)。
盡管本節(jié)沒有涉及, request
庫還對許多高級的 HTTP 客戶端協(xié)議提供了支持,比如 OAuth。 requests
模塊的文檔(http://docs.python-requests.org)質(zhì)量很高(坦白說比在這短短的一節(jié)的篇幅中所提供的任何信息都好),可以參考文檔以獲得更多地信息。
你想實(shí)現(xiàn)一個(gè)服務(wù)器,通過 TCP 協(xié)議和客戶端通信。
創(chuàng)建一個(gè) TCP 服務(wù)器的一個(gè)簡單方法是使用 socketserver
庫。例如,下面是一個(gè)簡單的應(yīng)答服務(wù)器:
from socketserver import BaseRequestHandler, TCPServer
class EchoHandler(BaseRequestHandler):
def handle(self):
print('Got connection from', self.client_address)
while True:
msg = self.request.recv(8192)
if not msg:
break
self.request.send(msg)
if __name__ == '__main__':
serv = TCPServer(('', 20000), EchoHandler)
serv.serve_forever()
在這段代碼中,你定義了一個(gè)特殊的處理類,實(shí)現(xiàn)了一個(gè) handle()
方法,用來為客戶端連接服務(wù)。request
屬性是客戶端 socket,client_address
有客戶端地址。 為了測試這個(gè)服務(wù)器,運(yùn)行它并打開另外一個(gè) Python 進(jìn)程連接這個(gè)服務(wù)器:
>>> from socket import socket, AF_INET, SOCK_STREAM
>>> s = socket(AF_INET, SOCK_STREAM)
>>> s.connect(('localhost', 20000))
>>> s.send(b'Hello')
5
>>> s.recv(8192)
b'Hello'
>>>
很多時(shí)候,可以很容易的定義一個(gè)不同的處理器。下面是一個(gè)使用 StreamRequestHandler
基類將一個(gè)類文件接口放置在底層 socket 上的例子:
from socketserver import StreamRequestHandler, TCPServer
class EchoHandler(StreamRequestHandler):
def handle(self):
print('Got connection from', self.client_address)
# self.rfile is a file-like object for reading
for line in self.rfile:
# self.wfile is a file-like object for writing
self.wfile.write(line)
if __name__ == '__main__':
serv = TCPServer(('', 20000), EchoHandler)
serv.serve_forever()
socketserver
可以讓我們很容易的創(chuàng)建簡單的 TCP 服務(wù)器。 但是,你需要注意的是,默認(rèn)情況下這種服務(wù)器是單線程的,一次只能為一個(gè)客戶端連接服務(wù)。 如果你想處理多個(gè)客戶端,可以初始化一個(gè) ForkingTCPServer
或者是 ThreadingTCPServer
對象。例如:
from socketserver import ThreadingTCPServer
if __name__ == '__main__':
serv = ThreadingTCPServer(('', 20000), EchoHandler)
serv.serve_forever()
使用 fork 或線程服務(wù)器有個(gè)潛在問題就是它們會為每個(gè)客戶端連接創(chuàng)建一個(gè)新的進(jìn)程或線程。 由于客戶端連接數(shù)是沒有限制的,因此一個(gè)惡意的黑客可以同時(shí)發(fā)送大量的連接讓你的服務(wù)器奔潰。
如果你擔(dān)心這個(gè)問題,你可以創(chuàng)建一個(gè)預(yù)先分配大小的工作線程池或進(jìn)程池。 你先創(chuàng)建一個(gè)普通的非線程服務(wù)器,然后在一個(gè)線程池中使用 serve_forever()
方法來啟動它們。
if __name__ == '__main__':
from threading import Thread
NWORKERS = 16
serv = TCPServer(('', 20000), EchoHandler)
for n in range(NWORKERS):
t = Thread(target=serv.serve_forever)
t.daemon = True
t.start()
serv.serve_forever()
一般來講,一個(gè) TCPServer
在實(shí)例化的時(shí)候會綁定并激活相應(yīng)的 socket
。 不過,有時(shí)候你想通過設(shè)置某些選項(xiàng)去調(diào)整底下的 socket
,可以設(shè)置參數(shù) bind_and_activate=False
。如下:
if __name__ == '__main__':
serv = TCPServer(('', 20000), EchoHandler, bind_and_activate=False)
# Set up various socket options
serv.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
# Bind and activate
serv.server_bind()
serv.server_activate()
serv.serve_forever()
上面的socket
選項(xiàng)是一個(gè)非常普遍的配置項(xiàng),它允許服務(wù)器重新綁定一個(gè)之前使用過的端口號。 由于要被經(jīng)常使用到,它被放置到類變量中,可以直接在 TCPServer
上面設(shè)置。 在實(shí)例化服務(wù)器的時(shí)候去設(shè)置它的值,如下所示:
if __name__ == '__main__':
TCPServer.allow_reuse_address = True
serv = TCPServer(('', 20000), EchoHandler)
serv.serve_forever()
在上面示例中,我們演示了兩種不同的處理器基類( BaseRequestHandler
和 StreamRequestHandler
)。 StreamRequestHandler
更加靈活點(diǎn),能通過設(shè)置其他的類變量來支持一些新的特性。比如:
import socket
class EchoHandler(StreamRequestHandler):
# Optional settings (defaults shown)
timeout = 5 # Timeout on all socket operations
rbufsize = -1 # Read buffer size
wbufsize = 0 # Write buffer size
disable_nagle_algorithm = False # Sets TCP_NODELAY socket option
def handle(self):
print('Got connection from', self.client_address)
try:
for line in self.rfile:
# self.wfile is a file-like object for writing
self.wfile.write(line)
except socket.timeout:
print('Timed out!')
最后,還需要注意的是巨大部分 Python 的高層網(wǎng)絡(luò)模塊(比如 HTTP、XML-RPC 等)都是建立在 socketserver
功能之上。 也就是說,直接使用 socket
庫來實(shí)現(xiàn)服務(wù)器也并不是很難。 下面是一個(gè)使用 socket
直接編程實(shí)現(xiàn)的一個(gè)服務(wù)器簡單例子:
from socket import socket, AF_INET, SOCK_STREAM
def echo_handler(address, client_sock):
print('Got connection from {}'.format(address))
while True:
msg = client_sock.recv(8192)
if not msg:
break
client_sock.sendall(msg)
client_sock.close()
def echo_server(address, backlog=5):
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(address)
sock.listen(backlog)
while True:
client_sock, client_addr = sock.accept()
echo_handler(client_addr, client_sock)
if __name__ == '__main__':
echo_server(('', 20000))
你想實(shí)現(xiàn)一個(gè)基于 UDP 協(xié)議的服務(wù)器來與客戶端通信。
跟 TCP 一樣,UDP 服務(wù)器也可以通過使用 socketserver
庫很容易的被創(chuàng)建。 例如,下面是一個(gè)簡單的時(shí)間服務(wù)器:
from socketserver import BaseRequestHandler, UDPServer
import time
class TimeHandler(BaseRequestHandler):
def handle(self):
print('Got connection from', self.client_address)
# Get message and client socket
msg, sock = self.request
resp = time.ctime()
sock.sendto(resp.encode('ascii'), self.client_address)
if __name__ == '__main__':
serv = UDPServer(('', 20000), TimeHandler)
serv.serve_forever()
跟之前一樣,你先定義一個(gè)實(shí)現(xiàn) handle()
特殊方法的類,為客戶端連接服務(wù)。 這個(gè)類的 request
屬性是一個(gè)包含了數(shù)據(jù)報(bào)和底層 socket 對象的元組。client_address
包含了客戶端地址。
我們來測試下這個(gè)服務(wù)器,首先運(yùn)行它,然后打開另外一個(gè) Python 進(jìn)程向服務(wù)器發(fā)送消息:
>>> from socket import socket, AF_INET, SOCK_DGRAM
>>> s = socket(AF_INET, SOCK_DGRAM)
>>> s.sendto(b'', ('localhost', 20000))
0
>>> s.recvfrom(8192)
(b'Wed Aug 15 20:35:08 2012', ('127.0.0.1', 20000))
>>>
一個(gè)典型的 UPD 服務(wù)器接收到達(dá)的數(shù)據(jù)報(bào)(消息)和客戶端地址。如果服務(wù)器需要做應(yīng)答, 它要給客戶端回發(fā)一個(gè)數(shù)據(jù)報(bào)。對于數(shù)據(jù)報(bào)的傳送, 你應(yīng)該使用 socket 的 sendto()
和 recvfrom()
方法。 盡管傳統(tǒng)的 send()
和 recv()
也可以達(dá)到同樣的效果, 但是前面的兩個(gè)方法對于 UDP 連接而言更普遍。
由于沒有底層的連接,UPD 服務(wù)器相對于 TCP 服務(wù)器來講實(shí)現(xiàn)起來更加簡單。 不過,UDP 天生是不可靠的(因?yàn)橥ㄐ艣]有建立連接,消息可能丟失)。 因此需要由你自己來決定該怎樣處理丟失消息的情況。這個(gè)已經(jīng)不在本書討論范圍內(nèi)了, 不過通常來說,如果可靠性對于你程序很重要,你需要借助于序列號、重試、超時(shí)以及一些其他方法來保證。 UDP 通常被用在那些對于可靠傳輸要求不是很高的場合。例如,在實(shí)時(shí)應(yīng)用如多媒體流以及游戲領(lǐng)域, 無需返回恢復(fù)丟失的數(shù)據(jù)包(程序只需簡單的忽略它并繼續(xù)向前運(yùn)行)。
UDPServer
類是單線程的,也就是說一次只能為一個(gè)客戶端連接服務(wù)。 實(shí)際使用中,這個(gè)無論是對于 UDP 還是 TCP 都不是什么大問題。 如果你想要并發(fā)操作,可以實(shí)例化一個(gè) ForkingUDPServer
或 ThreadingUDPServer
對象:
from socketserver import ThreadingUDPServer
if __name__ == '__main__':
serv = ThreadingUDPServer(('',20000), TimeHandler)
serv.serve_forever()
直接使用 socket
來是想一個(gè) UDP 服務(wù)器也不難,下面是一個(gè)例子:
from socket import socket, AF_INET, SOCK_DGRAM
import time
def time_server(address):
sock = socket(AF_INET, SOCK_DGRAM)
sock.bind(address)
while True:
msg, addr = sock.recvfrom(8192)
print('Got message from', addr)
resp = time.ctime()
sock.sendto(resp.encode('ascii'), addr)
if __name__ == '__main__':
time_server(('', 20000))
你有一個(gè) CIDR 網(wǎng)絡(luò)地址比如“123.45.67.89/27”,你想將其轉(zhuǎn)換成它所代表的所有 IP (比如,“123.45.67.64”, “123.45.67.65”, …, “123.45.67.95”))
可以使用 ipaddress
模塊很容易的實(shí)現(xiàn)這樣的計(jì)算。例如:
>>> import ipaddress
>>> net = ipaddress.ip_network('123.45.67.64/27')
>>> net
IPv4Network('123.45.67.64/27')
>>> for a in net:
... print(a)
...
123.45.67.64
123.45.67.65
123.45.67.66
123.45.67.67
123.45.67.68
...
123.45.67.95
>>>
>>> net6 = ipaddress.ip_network('12:3456:78:90ab:cd:ef01:23:30/125')
>>> net6
IPv6Network('12:3456:78:90ab:cd:ef01:23:30/125')
>>> for a in net6:
... print(a)
...
12:3456:78:90ab:cd:ef01:23:30
12:3456:78:90ab:cd:ef01:23:31
12:3456:78:90ab:cd:ef01:23:32
12:3456:78:90ab:cd:ef01:23:33
12:3456:78:90ab:cd:ef01:23:34
12:3456:78:90ab:cd:ef01:23:35
12:3456:78:90ab:cd:ef01:23:36
12:3456:78:90ab:cd:ef01:23:37
>>>
Network
也允許像數(shù)組一樣的索引取值,例如:
>>> net.num_addresses
32
>>> net[0]
IPv4Address('123.45.67.64')
>>> net[1]
IPv4Address('123.45.67.65')
>>> net[-1]
IPv4Address('123.45.67.95')
>>> net[-2]
IPv4Address('123.45.67.94')
>>>
另外,你還可以執(zhí)行網(wǎng)絡(luò)成員檢查之類的操作:
>>> a = ipaddress.ip_address('123.45.67.69')
>>> a in net
True
>>> b = ipaddress.ip_address('123.45.67.123')
>>> b in net
False
>>>
一個(gè) IP 地址和網(wǎng)絡(luò)地址能通過一個(gè) IP 接口來指定,例如:
>>> inet = ipaddress.ip_interface('123.45.67.73/27')
>>> inet.network
IPv4Network('123.45.67.64/27')
>>> inet.ip
IPv4Address('123.45.67.73')
>>>
ipaddress
模塊有很多類可以表示 IP 地址、網(wǎng)絡(luò)和接口。 當(dāng)你需要操作網(wǎng)絡(luò)地址(比如解析、打印、驗(yàn)證等)的時(shí)候會很有用。
要注意的是,ipaddress
模塊跟其他一些和網(wǎng)絡(luò)相關(guān)的模塊比如 socket
庫交集很少。 所以,你不能使用 IPv4Address
的實(shí)例來代替一個(gè)地址字符串,你首先得顯式的使用str()
轉(zhuǎn)換它。例如:
>>> a = ipaddress.ip_address('127.0.0.1')
>>> from socket import socket, AF_INET, SOCK_STREAM
>>> s = socket(AF_INET, SOCK_STREAM)
>>> s.connect((a, 8080))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: Can't convert 'IPv4Address' object to str implicitly
>>> s.connect((str(a), 8080))
>>>
更多相關(guān)內(nèi)容,請參考 An Introduction to the ipaddress Module
你想使用一個(gè)簡單的 REST 接口通過網(wǎng)絡(luò)遠(yuǎn)程控制或訪問你的應(yīng)用程序,但是你又不想自己去安裝一個(gè)完整的 web 框架。
構(gòu)建一個(gè) REST 風(fēng)格的接口最簡單的方法是創(chuàng)建一個(gè)基于 WSGI 標(biāo)準(zhǔn)(PEP 3333)的很小的庫,下面是一個(gè)例子:
# resty.py
import cgi
def notfound_404(environ, start_response):
start_response('404 Not Found', [ ('Content-type', 'text/plain') ])
return [b'Not Found']
class PathDispatcher:
def __init__(self):
self.pathmap = { }
def __call__(self, environ, start_response):
path = environ['PATH_INFO']
params = cgi.FieldStorage(environ['wsgi.input'],
environ=environ)
method = environ['REQUEST_METHOD'].lower()
environ['params'] = { key: params.getvalue(key) for key in params }
handler = self.pathmap.get((method,path), notfound_404)
return handler(environ, start_response)
def register(self, method, path, function):
self.pathmap[method.lower(), path] = function
return function
為了使用這個(gè)調(diào)度器,你只需要編寫不同的處理器,就像下面這樣:
import time
_hello_resp = '''\
<html>
<head>
<title>Hello {name}</title>
</head>
<body>
<h1>Hello {name}!</h1>
</body>
</html>'''
def hello_world(environ, start_response):
start_response('200 OK', [ ('Content-type','text/html')])
params = environ['params']
resp = _hello_resp.format(name=params.get('name'))
yield resp.encode('utf-8')
_localtime_resp = '''\
<?xml version="1.0"?>
<time>
<year>{t.tm_year}</year>
<month>{t.tm_mon}</month>
<day>{t.tm_mday}</day>
<hour>{t.tm_hour}</hour>
<minute>{t.tm_min}</minute>
<second>{t.tm_sec}</second>
</time>'''
def localtime(environ, start_response):
start_response('200 OK', [ ('Content-type', 'application/xml') ])
resp = _localtime_resp.format(t=time.localtime())
yield resp.encode('utf-8')
if __name__ == '__main__':
from resty import PathDispatcher
from wsgiref.simple_server import make_server
# Create the dispatcher and register functions
dispatcher = PathDispatcher()
dispatcher.register('GET', '/hello', hello_world)
dispatcher.register('GET', '/localtime', localtime)
# Launch a basic server
httpd = make_server('', 8080, dispatcher)
print('Serving on port 8080...')
httpd.serve_forever()
要測試下這個(gè)服務(wù)器,你可以使用一個(gè)瀏覽器或 urllib
和它交互。例如:
>>> u = urlopen('http://localhost:8080/hello?name=Guido')
>>> print(u.read().decode('utf-8'))
<html>
<head>
<title>Hello Guido</title>
</head>
<body>
<h1>Hello Guido!</h1>
</body>
</html>
>>> u = urlopen('http://localhost:8080/localtime')
>>> print(u.read().decode('utf-8'))
<?xml version="1.0"?>
<time>
<year>2012</year>
<month>11</month>
<day>24</day>
<hour>14</hour>
<minute>49</minute>
<second>17</second>
</time>
>>>
在編寫 REST 接口時(shí),通常都是服務(wù)于普通的 HTTP 請求。但是跟那些功能完整的網(wǎng)站相比,你通常只需要處理數(shù)據(jù)。 這些數(shù)據(jù)以各種標(biāo)準(zhǔn)格式編碼,比如 XML、JSON 或 CSV。 盡管程序看上去很簡單,但是以這種方式提供的 API 對于很多應(yīng)用程序來講是非常有用的。
例如,長期運(yùn)行的程序可能會使用一個(gè) REST API 來實(shí)現(xiàn)監(jiān)控或診斷。 大數(shù)據(jù)應(yīng)用程序可以使用 REST 來構(gòu)建一個(gè)數(shù)據(jù)查詢或提取系統(tǒng)。 REST 還能用來控制硬件設(shè)備比如機(jī)器人、傳感器、工廠或燈泡。 更重要的是,REST API 已經(jīng)被大量客戶端編程環(huán)境所支持,比如 Javascript, Android, iOS 等。 因此,利用這種接口可以讓你開發(fā)出更加復(fù)雜的應(yīng)用程序。
為了實(shí)現(xiàn)一個(gè)簡單的 REST 接口,你只需讓你的程序代碼滿足 Python 的 WSGI 標(biāo)準(zhǔn)即可。 WSGI 被標(biāo)準(zhǔn)庫支持,同時(shí)也被絕大部分第三方 web 框架支持。 因此,如果你的代碼遵循這個(gè)標(biāo)準(zhǔn),在后面的使用過程中就會更加的靈活!
在 WSGI 中,你可以像下面這樣約定的方式以一個(gè)可調(diào)用對象形式來實(shí)現(xiàn)你的程序。
import cgi
def wsgi_app(environ, start_response):
pass
environ
屬性是一個(gè)字典,包含了從 web 服務(wù)器如 Apache [參考 Internet RFC 3875]提供的 CGI 接口中獲取的值。 要將這些不同的值提取出來,你可以像這么這樣寫:
def wsgi_app(environ, start_response):
method = environ['REQUEST_METHOD']
path = environ['PATH_INFO']
# Parse the query parameters
params = cgi.FieldStorage(environ['wsgi.input'], environ=environ)
我們展示了一些常見的值。environ['REQUEST_METHOD']
代表請求類型如 GET、POST、HEAD 等。 environ['PATH_INFO']
表示被請求資源的路徑。 調(diào)用 cgi.FieldStorage()
可以從請求中提取查詢參數(shù)并將它們放入一個(gè)類字典對象中以便后面使用。
start_response
參數(shù)是一個(gè)為了初始化一個(gè)請求對象而必須被調(diào)用的函數(shù)。 第一個(gè)參數(shù)是返回的 HTTP 狀態(tài)值,第二個(gè)參數(shù)是一個(gè)(名,值)元組列表,用來構(gòu)建返回的 HTTP 頭。例如:
def wsgi_app(environ, start_response):
pass
start_response('200 OK', [('Content-type', 'text/plain')])
為了返回?cái)?shù)據(jù),一個(gè) WSGI 程序必須返回一個(gè)字節(jié)字符串序列??梢韵裣旅孢@樣使用一個(gè)列表來完成:
def wsgi_app(environ, start_response):
pass
start_response('200 OK', [('Content-type', 'text/plain')])
resp = []
resp.append(b'Hello World\n')
resp.append(b'Goodbye!\n')
return resp
或者,你還可以使用yield
:
def wsgi_app(environ, start_response):
pass
start_response('200 OK', [('Content-type', 'text/plain')])
yield b'Hello World\n'
yield b'Goodbye!\n'
這里要強(qiáng)調(diào)的一點(diǎn)是最后返回的必須是字節(jié)字符串。如果返回結(jié)果包含文本字符串,必須先將其編碼成字節(jié)。 當(dāng)然,并沒有要求你返回的一點(diǎn)是文本,你可以很輕松的編寫一個(gè)生成圖片的程序。
盡管 WSGI 程序通常被定義成一個(gè)函數(shù),不過你也可以使用類實(shí)例來實(shí)現(xiàn),只要它實(shí)現(xiàn)了合適的 __call__()
方法。例如:
class WSGIApplication:
def __init__(self):
...
def __call__(self, environ, start_response)
...
我們已經(jīng)在上面使用這種技術(shù)創(chuàng)建 PathDispatcher
類。 這個(gè)分發(fā)器僅僅只是管理一個(gè)字典,將(方法,路徑)對映射到處理器函數(shù)上面。 當(dāng)一個(gè)請求到來時(shí),它的方法和路徑被提取出來,然后被分發(fā)到對應(yīng)的處理器上面去。 另外,任何查詢變量會被解析后放到一個(gè)字典中,以 environ['params']
形式存儲。 后面這個(gè)步驟太常見,所以建議你在分發(fā)器里面完成,這樣可以省掉很多重復(fù)代碼。 使用分發(fā)器的時(shí)候,你只需簡單的創(chuàng)建一個(gè)實(shí)例,然后通過它注冊各種 WSGI 形式的函數(shù)。 編寫這些函數(shù)應(yīng)該超級簡單了,只要你遵循 start_response()
函數(shù)的編寫規(guī)則,并且最后返回字節(jié)字符串即可。
當(dāng)編寫這種函數(shù)的時(shí)候還需注意的一點(diǎn)就是對于字符串模板的使用。 沒人愿意寫那種到處混合著 print()
函數(shù) 、XML 和大量格式化操作的代碼。 我們上面使用了三引號包含的預(yù)先定義好的字符串模板。 這種方式的可以讓我們很容易的在以后修改輸出格式(只需要修改模板本身,而不用動任何使用它的地方)。
最后,使用 WSGI 還有一個(gè)很重要的部分就是沒有什么地方是針對特定 web 服務(wù)器的。 因?yàn)闃?biāo)準(zhǔn)對于服務(wù)器和框架是中立的,你可以將你的程序放入任何類型服務(wù)器中。 我們使用下面的代碼測試測試本節(jié)代碼:
if __name__ == '__main__':
from wsgiref.simple_server import make_server
# Create the dispatcher and register functions
dispatcher = PathDispatcher()
pass
# Launch a basic server
httpd = make_server('', 8080, dispatcher)
print('Serving on port 8080...')
httpd.serve_forever()
上面代碼創(chuàng)建了一個(gè)簡單的服務(wù)器,然后你就可以來測試下你的實(shí)現(xiàn)是否能正常工作。 最后,當(dāng)你準(zhǔn)備進(jìn)一步擴(kuò)展你的程序的時(shí)候,你可以修改這個(gè)代碼,讓它可以為特定服務(wù)器工作。
WSGI 本身是一個(gè)很小的標(biāo)準(zhǔn)。因此它并沒有提供一些高級的特性比如認(rèn)證、cookies、重定向等。 這些你自己實(shí)現(xiàn)起來也不難。不過如果你想要更多的支持,可以考慮第三方庫,比如WebOb
或者Paste
你想找到一個(gè)簡單的方式去執(zhí)行運(yùn)行在遠(yuǎn)程機(jī)器上面的 Python 程序中的函數(shù)或方法。
實(shí)現(xiàn)一個(gè)遠(yuǎn)程方法調(diào)用的最簡單方式是使用 XML-RPC。下面我們演示一下一個(gè)實(shí)現(xiàn)了鍵-值存儲功能的簡單服務(wù)器:
from xmlrpc.server import SimpleXMLRPCServer
class KeyValueServer:
_rpc_methods_ = ['get', 'set', 'delete', 'exists', 'keys']
def __init__(self, address):
self._data = {}
self._serv = SimpleXMLRPCServer(address, allow_none=True)
for name in self._rpc_methods_:
self._serv.register_function(getattr(self, name))
def get(self, name):
return self._data[name]
def set(self, name, value):
self._data[name] = value
def delete(self, name):
del self._data[name]
def exists(self, name):
return name in self._data
def keys(self):
return list(self._data)
def serve_forever(self):
self._serv.serve_forever()
# Example
if __name__ == '__main__':
kvserv = KeyValueServer(('', 15000))
kvserv.serve_forever()
下面我們從一個(gè)客戶端機(jī)器上面來訪問服務(wù)器:
>>> from xmlrpc.client import ServerProxy
>>> s = ServerProxy('http://localhost:15000', allow_none=True)
>>> s.set('foo', 'bar')
>>> s.set('spam', [1, 2, 3])
>>> s.keys()
['spam', 'foo']
>>> s.get('foo')
'bar'
>>> s.get('spam')
[1, 2, 3]
>>> s.delete('spam')
>>> s.exists('spam')
False
>>>
XML-RPC 可以讓我們很容易的構(gòu)造一個(gè)簡單的遠(yuǎn)程調(diào)用服務(wù)。你所需要做的僅僅是創(chuàng)建一個(gè)服務(wù)器實(shí)例, 通過它的方法 register_function()
來注冊函數(shù),然后使用方法 serve_forever()
啟動它。 在上面我們將這些步驟放在一起寫到一個(gè)類中,不夠這并不是必須的。比如你還可以像下面這樣創(chuàng)建一個(gè)服務(wù)器:
from xmlrpc.server import SimpleXMLRPCServer
def add(x,y):
return x+y
serv = SimpleXMLRPCServer(('', 15000))
serv.register_function(add)
serv.serve_forever()
XML-RPC 暴露出來的函數(shù)只能適用于部分?jǐn)?shù)據(jù)類型,比如字符串、整形、列表和字典。 對于其他類型就得需要做些額外的功課了。 例如,如果你想通過 XML-RPC 傳遞一個(gè)對象實(shí)例,實(shí)際上只有他的實(shí)例字典被處理:
>>> class Point:
... def __init__(self, x, y):
... self.x = x
... self.y = y
...
>>> p = Point(2, 3)
>>> s.set('foo', p)
>>> s.get('foo')
{'x': 2, 'y': 3}
>>>
類似的,對于二進(jìn)制數(shù)據(jù)的處理也跟你想象的不太一樣:
>>> s.set('foo', b'Hello World')
>>> s.get('foo')
<xmlrpc.client.Binary object at 0x10131d410>
>>> _.data
b'Hello World'
>>>
一般來講,你不應(yīng)該將 XML-RPC 服務(wù)以公共 API 的方式暴露出來。 對于這種情況,通常分布式應(yīng)用程序會是一個(gè)更好的選擇。
XML-RPC 的一個(gè)缺點(diǎn)是它的性能。SimpleXMLRPCServer
的實(shí)現(xiàn)是單線程的, 所以它不適合于大型程序,盡管我們在11.2小節(jié)中演示過它是可以通過多線程來執(zhí)行的。 另外,由于 XML-RPC 將所有數(shù)據(jù)都序列化為 XML 格式,所以它會比其他的方式運(yùn)行的慢一些。 但是它也有優(yōu)點(diǎn),這種方式的編碼可以被絕大部分其他編程語言支持。 通過使用這種方式,其他語言的客戶端程序都能訪問你的服務(wù)。
雖然 XML-RPC 有很多缺點(diǎn),但是如果你需要快速構(gòu)建一個(gè)簡單遠(yuǎn)程過程調(diào)用系統(tǒng)的話,它仍然值得去學(xué)習(xí)的。 有時(shí)候,簡單的方案就已經(jīng)足夠了。
你在不同的機(jī)器上面運(yùn)行著多個(gè) Python 解釋器實(shí)例,并希望能夠在這些解釋器之間通過消息來交換數(shù)據(jù)。
通過使用 multiprocessing.connection
模塊可以很容易的實(shí)現(xiàn)解釋器之間的通信。 下面是一個(gè)簡單的應(yīng)答服務(wù)器例子:
from multiprocessing.connection import Listener
import traceback
def echo_client(conn):
try:
while True:
msg = conn.recv()
conn.send(msg)
except EOFError:
print('Connection closed')
def echo_server(address, authkey):
serv = Listener(address, authkey=authkey)
while True:
try:
client = serv.accept()
echo_client(client)
except Exception:
traceback.print_exc()
echo_server(('', 25000), authkey=b'peekaboo')
然后客戶端連接服務(wù)器并發(fā)送消息的簡單示例:
>>> from multiprocessing.connection import Client
>>> c = Client(('localhost', 25000), authkey=b'peekaboo')
>>> c.send('hello')
>>> c.recv()
'hello'
>>> c.send(42)
>>> c.recv()
42
>>> c.send([1, 2, 3, 4, 5])
>>> c.recv()
[1, 2, 3, 4, 5]
>>>
跟底層 socket 不同的是,每個(gè)消息會完整保存(每一個(gè)通過 send()發(fā)送的對象能通過 recv()來完整接受)。 另外,所有對象會通過 pickle 序列化。因此,任何兼容 pickle 的對象都能在此連接上面被發(fā)送和接受。
目前有很多用來實(shí)現(xiàn)各種消息傳輸?shù)陌秃瘮?shù)庫,比如 ZeroMQ、Celery 等。 你還有另外一種選擇就是自己在底層 socket 基礎(chǔ)之上來實(shí)現(xiàn)一個(gè)消息傳輸層。 但是你想要簡單一點(diǎn)的方案,那么這時(shí)候 multiprocessing.connection
就派上用場了。 僅僅使用一些簡單的語句即可實(shí)現(xiàn)多個(gè)解釋器之間的消息通信。
如果你的解釋器運(yùn)行在同一臺機(jī)器上面,那么你可以使用另外的通信機(jī)制,比如 Unix 域套接字或者是 Windows 命名管道。 要想使用 UNIX 域套接字來創(chuàng)建一個(gè)連接,只需簡單的將地址改寫一個(gè)文件名即可:
s = Listener('/tmp/myconn', authkey=b'peekaboo')
要想使用 Windows 命名管道來創(chuàng)建連接,只需像下面這樣使用一個(gè)文件名:
s = Listener(r'\\.\pipe\myconn', authkey=b'peekaboo')
一個(gè)通用準(zhǔn)則是,你不要使用 multiprocessing
來實(shí)現(xiàn)一個(gè)對外的公共服務(wù)。 Client()
和 Listener()
中的 authkey
參數(shù)用來認(rèn)證發(fā)起連接的終端用戶。 如果密鑰不對會產(chǎn)生一個(gè)異常。此外,該模塊最適合用來建立長連接(而不是大量的短連接), 例如,兩個(gè)解釋器之間啟動后就開始建立連接并在處理某個(gè)問題過程中會一直保持連接狀態(tài)。
如果你需要對底層連接做更多的控制,比如需要支持超時(shí)、非阻塞 I/O 或其他類似的特性, 你最好使用另外的庫或者是在高層 socket 上來實(shí)現(xiàn)這些特性。
你想在一個(gè)消息傳輸層如 sockets
、multiprocessing connections
或 ZeroMQ
的基礎(chǔ)之上實(shí)現(xiàn)一個(gè)簡單的遠(yuǎn)程過程調(diào)用(RPC)。
將函數(shù)請求、參數(shù)和返回值使用 pickle 編碼后,在不同的解釋器直接傳送 pickle 字節(jié)字符串,可以很容易的實(shí)現(xiàn) RPC。 下面是一個(gè)簡單的 PRC 處理器,可以被整合到一個(gè)服務(wù)器中去:
# rpcserver.py
import pickle
class RPCHandler:
def __init__(self):
self._functions = { }
def register_function(self, func):
self._functions[func.__name__] = func
def handle_connection(self, connection):
try:
while True:
# Receive a message
func_name, args, kwargs = pickle.loads(connection.recv())
# Run the RPC and send a response
try:
r = self._functions[func_name](*args,**kwargs)
connection.send(pickle.dumps(r))
except Exception as e:
connection.send(pickle.dumps(e))
except EOFError:
pass
要使用這個(gè)處理器,你需要將它加入到一個(gè)消息服務(wù)器中。你有很多種選擇, 但是使用 multiprocessing
庫是最簡單的。下面是一個(gè) RPC 服務(wù)器例子:
from multiprocessing.connection import Listener
from threading import Thread
def rpc_server(handler, address, authkey):
sock = Listener(address, authkey=authkey)
while True:
client = sock.accept()
t = Thread(target=handler.handle_connection, args=(client,))
t.daemon = True
t.start()
# Some remote functions
def add(x, y):
return x + y
def sub(x, y):
return x - y
# Register with a handler
handler = RPCHandler()
handler.register_function(add)
handler.register_function(sub)
# Run the server
rpc_server(handler, ('localhost', 17000), authkey=b'peekaboo')
為了從一個(gè)遠(yuǎn)程客戶端訪問服務(wù)器,你需要?jiǎng)?chuàng)建一個(gè)對應(yīng)的用來傳送請求的 RPC 代理類。例如
import pickle
class RPCProxy:
def __init__(self, connection):
self._connection = connection
def __getattr__(self, name):
def do_rpc(*args, **kwargs):
self._connection.send(pickle.dumps((name, args, kwargs)))
result = pickle.loads(self._connection.recv())
if isinstance(result, Exception):
raise result
return result
return do_rpc
要使用這個(gè)代理類,你需要將其包裝到一個(gè)服務(wù)器的連接上面,例如:
>>> from multiprocessing.connection import Client
>>> c = Client(('localhost', 17000), authkey=b'peekaboo')
>>> proxy = RPCProxy(c)
>>> proxy.add(2, 3)
5
>>> proxy.sub(2, 3)
-1
>>> proxy.sub([1, 2], 4)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "rpcserver.py", line 37, in do_rpc
raise result
TypeError: unsupported operand type(s) for -: 'list' and 'int'
>>>
要注意的是很多消息層(比如 multiprocessing
)已經(jīng)使用 pickle 序列化了數(shù)據(jù)。 如果是這樣的話,對 pickle.dumps()
和 pickle.loads()
的調(diào)用要去掉。
RPCHandler
和 RPCProxy
的基本思路是很比較簡單的。 如果一個(gè)客戶端想要調(diào)用一個(gè)遠(yuǎn)程函數(shù),比如 foo(1, 2, z=3)
,代理類創(chuàng)建一個(gè)包含了函數(shù)名和參數(shù)的元組 ('foo', (1, 2), {'z': 3})
。 這個(gè)元組被 pickle 序列化后通過網(wǎng)絡(luò)連接發(fā)生出去。 這一步在 RPCProxy
的__getattr__()
方法返回的 do_rpc()
閉包中完成。 服務(wù)器接收后通過 pickle 反序列化消息,查找函數(shù)名看看是否已經(jīng)注冊過,然后執(zhí)行相應(yīng)的函數(shù)。 執(zhí)行結(jié)果(或異常)被pickle序列化后返回發(fā)送給客戶端。我們的實(shí)例需要依賴 multiprocessing
進(jìn)行通信。 不過,這種方式可以適用于其他任何消息系統(tǒng)。例如,如果你想在 ZeroMQ 之上實(shí)習(xí) RPC, 僅僅只需要將連接對象換成合適的 ZeroMQ 的 socket 對象即可。
由于底層需要依賴 pickle,那么安全問題就需要考慮了 (因?yàn)橐粋€(gè)聰明的黑客可以創(chuàng)建特定的消息,能夠讓任意函數(shù)通過 pickle 反序列化后被執(zhí)行)。 因此你永遠(yuǎn)不要允許來自不信任或未認(rèn)證的客戶端的 RPC。特別是你絕對不要允許來自 Internet 的任意機(jī)器的訪問, 這種只能在內(nèi)部被使用,位于防火墻后面并且不要對外暴露。
作為 pickle 的替代,你也許可以考慮使用 JSON、XML 或一些其他的編碼格式來序列化消息。 例如,本機(jī)實(shí)例可以很容易的改寫成 JSON 編碼方案。還需要將 pickle.loads()
和 pickle.dumps()
替換成 json.loads()
和 json.dumps()
即可:
# jsonrpcserver.py
import json
class RPCHandler:
def __init__(self):
self._functions = { }
def register_function(self, func):
self._functions[func.__name__] = func
def handle_connection(self, connection):
try:
while True:
# Receive a message
func_name, args, kwargs = json.loads(connection.recv())
# Run the RPC and send a response
try:
r = self._functions[func_name](*args,**kwargs)
connection.send(json.dumps(r))
except Exception as e:
connection.send(json.dumps(str(e)))
except EOFError:
pass
# jsonrpcclient.py
import json
class RPCProxy:
def __init__(self, connection):
self._connection = connection
def __getattr__(self, name):
def do_rpc(*args, **kwargs):
self._connection.send(json.dumps((name, args, kwargs)))
result = json.loads(self._connection.recv())
return result
return do_rpc
實(shí)現(xiàn) RPC 的一個(gè)比較復(fù)雜的問題是如何去處理異常。至少,當(dāng)方法產(chǎn)生異常時(shí)服務(wù)器不應(yīng)該奔潰。 因此,返回給客戶端的異常所代表的含義就要好好設(shè)計(jì)了。 如果你使用 pickle,異常對象實(shí)例在客戶端能被反序列化并拋出。如果你使用其他的協(xié)議,那得想想另外的方法了。 不過至少,你應(yīng)該在響應(yīng)中返回異常字符串。我們在 JSON 的例子中就是使用的這種方式。
對于其他的 RPC 實(shí)現(xiàn)例子,我推薦你看看在 XML-RPC 中使用的 SimpleXMLRPCServer
和 ServerProxy
的實(shí)現(xiàn), 也就是11.6小節(jié)中的內(nèi)容。