鍍金池/ 教程/ Python/ 多套方案來提高 python web 框架的并發(fā)處理能力
通過 memcached 實現(xiàn)領(lǐng)號排隊功能及 python 隊列實例
利用 pypy 提高 python 腳本的執(zhí)行速度及測試性能
Python FAQ3-python 中 的原始(raw)字符串
Mongodb 千萬級數(shù)據(jù)在 python 下的綜合壓力測試及應用探討
Parallel Python 實現(xiàn)程序的并行多 cpu 多核利用【pp 模塊】
python simplejson 模塊淺談
服務端 socket 開發(fā)之多線程和 gevent 框架并發(fā)測試[python 語言]
python Howto 之 logging 模塊
python 之 MySQLdb 庫的使用
關(guān)于 python 調(diào)用 zabbix api 接口的自動化實例 [結(jié)合 saltstack]
python 之利用 PIL 庫實現(xiàn)頁面的圖片驗證碼及縮略圖
Python 通過 amqp 消息隊列協(xié)議中的 Qpid 實現(xiàn)數(shù)據(jù)通信
python 中用 string.maketrans 和 translate 巧妙替換字符串
python linecache 模塊讀取文件用法詳解
Python 批量更新 nginx 配置文件
python 計算文件的行數(shù)和讀取某一行內(nèi)容的實現(xiàn)方法
python+Django 實現(xiàn) Nagios 自動化添加監(jiān)控項目
多套方案來提高 python web 框架的并發(fā)處理能力
python 寫報警程序中的聲音實現(xiàn) winsound
python 調(diào)用 zabbix 的 api 接口添加主機、查詢組、主機、模板
對 Python-memcache 分布式散列和調(diào)用的實現(xiàn)
使用 python 構(gòu)建基于 hadoop 的 mapreduce 日志分析平臺
一個腳本講述 python 語言的基礎規(guī)范,適合初學者
Python 編寫的 socket 服務器和客戶端
如何將 Mac OS X10.9 下的 Python2.7 升級到最新的 Python3.3
python 監(jiān)控文件或目錄變化
報警監(jiān)控平臺擴展功能 url 回調(diào)的設計及應用 [python 語言]
Python 處理 cassandra 升級后的回滾腳本
python 實現(xiàn) select 和 epoll 模型 socket 網(wǎng)絡編程
關(guān)于 B+tree (附 python 模擬代碼)
通過 python 和 websocket 構(gòu)建實時通信系統(tǒng)[擴展 saltstack 監(jiān)控]

多套方案來提高 python web 框架的并發(fā)處理能力

Python 常見部署方法有 :

  1. fcgi :用 spawn-fcgi 或者框架自帶的工具對各個 project 分別生成監(jiān)聽進程,然后和 http 服務互動
  2. wsgi :利用 http 服務的 mod_wsgi 模塊來跑各個 project(Web 應用程序或框架簡單而通用的 Web 服務器 之間的接口)。
  3. uWSGI 是一款像 php-cgi 一樣監(jiān)聽同一端口,進行統(tǒng)一管理和負載平衡的工具,uWSGI,既不用 wsgi 協(xié)議也不用 fcgi 協(xié)議,而是自創(chuàng)了一個 uwsgi 的協(xié)議,據(jù)說該協(xié)議大約是 fcgi 協(xié)議的 10 倍那么快。

其實 WSGI 是分成 server 和 framework (即 application) 兩部分 (當然還有 middleware)。嚴格說 WSGI 只是一個協(xié)議, 規(guī)范 server 和 framework 之間連接的接口。

WSGI server 把服務器功能以 WSGI 接口暴露出來。比如 mod_wsgi 是一種 server, 把 apache 的功能以 WSGI 接口的形式提供出來。

  1. WSGI framework 就是我們經(jīng)常提到的 Django 這種框架。不過需要注意的是, 很少有單純的 WSGI framework , 基于 WSGI 的框架往往都自帶 WSGI server。比如 Django、CherryPy 都自帶 WSGI server 主要是測試用途, 發(fā)布時則使用生產(chǎn)環(huán)境的 WSGI server。而有些 WSGI 下的框架比如 pylons、bfg 等, 自己不實現(xiàn) WSGI server。使用 paste 作為 WSGI server。
  2. Paste 是流行的 WSGI server, 帶有很多中間件。還有 flup 也是一個提供中間件的庫。
  3. 搞清除 WSGI server 和 application, 中間件自然就清楚了。除了 session、cache 之類的應用, 前段時間看到一個 bfg 下的中間件專門用于給網(wǎng)站換膚的 (skin) 。中間件可以想到的用法還很多。
  4. 這里再補充一下, 像 django 這樣的框架如何以 fastcgi 的方式跑在 apache 上的。這要用到 flup.fcgi 或者 fastcgi.py (eurasia 中也設計了一個 fastcgi.py 的實現(xiàn)) 這些工具, 它們就是把 fastcgi 協(xié)議轉(zhuǎn)換成 WSGI 接口 (把 fastcgi 變成一個 WSGI server) 供框架接入。整個架構(gòu)是這樣的: django -> fcgi2wsgiserver -> mod_fcgi -> apache 。
  5. 雖然我不是 WSGI 的粉絲, 但是不可否認 WSGI 對 python web 的意義重大。有意自己設計 web 框架, 又不想做 socket 層和 http 報文解析的同學, 可以從 WSGI 開始設計自己的框架。在 python 圈子里有個共識, 自己隨手搞個 web 框架跟喝口水一樣自然, 非常方便?;蛟S每個 python 玩家都會經(jīng)歷一個倒騰框架的

uWSGI 的主要特點如下:

  • 超快的性能。
  • 低內(nèi)存占用(實測為 apache2 的 mod_wsgi 的一半左右)。
  • 多app管理。
  • 詳盡的日志功能(可以用來分析 app 性能和瓶頸)。
  • 高度可定制(內(nèi)存大小限制,服務一定次數(shù)后重啟等)。

uwsgi 的官方文檔:

http://projects.unbit.it/uwsgi/wiki/Doc

nginx.conf

location / {
  include uwsgi_params
  uwsgi_pass 127.0.0.1:9090
}

啟動 app

uwsgi -s :9090 -w myapp

uwsgi 的調(diào)優(yōu)參數(shù)~

uwsgi 的參數(shù)
以上是單個 project 的最簡單化部署,uwsgi 還是有很多令人稱贊的功能的,例如:
并發(fā) 4 個線程:
  uwsgi -s :9090 -w myapp -p 4
主控制線程 +4 個線程:
  uwsgi -s :9090 -w myapp -M -p 4
執(zhí)行超過 30 秒的 client 直接放棄:
  uwsgi -s :9090 -w myapp -M -p 4 -t 30
限制內(nèi)存空間 128M:
  uwsgi -s :9090 -w myapp -M -p 4 -t 30 --limit-as 128
服務超過 10000 個 req 自動 respawn:
  uwsgi -s :9090 -w myapp -M -p 4 -t 30 --limit-as 128 -R 10000
后臺運行等:
  uwsgi -s :9090 -w myapp -M -p 4 -t 30 --limit-as 128 -R 10000 -d uwsgi.log

為了讓多個站點共享一個 uwsgi 服務,必須把 uwsgi 運行成虛擬站點:去掉“-w myapp”加上”–vhost”:

uwsgi -s :9090 -M -p 4 -t 30 --limit-as 128 -R 10000 -d uwsgi.log --vhost

然后必須配置 virtualenv,virtualenv 是 Python 的一個很有用的虛擬環(huán)境工具,這樣安裝:

最后配置 nginx,注意每個站點必須單獨占用一個 server,同一 server 不同 location 定向到不同的應用不知為何總是失敗,估計也 算是一個 bug。

server {
    listen       80;
    server_name  app1.mydomain.com;
    location / {
            include uwsgi_params;
            uwsgi_pass 127.0.0.1:9090;
            uwsgi_param UWSGI_PYHOME /var/www/myenv;
            uwsgi_param UWSGI_SCRIPT myapp1;
            uwsgi_param UWSGI_CHDIR /var/www/myappdir1;
     }
}
server {
    listen       80;
    server_name  app2.mydomain.com;
    location / {
            include uwsgi_params;
            uwsgi_pass 127.0.0.1:9090;
            uwsgi_param UWSGI_PYHOME /var/www/myenv;
            uwsgi_param UWSGI_SCRIPT myapp2;
            uwsgi_param UWSGI_CHDIR /var/www/myappdir2;
    }
}

這樣,重啟 nginx 服務,兩個站點就可以共用一個 uwsgi 服務了。

再來搞下 fastcgi 的方式

location / {
        fastcgi_param REQUEST_METHOD $request_method;
        fastcgi_param QUERY_STRING $query_string;
        fastcgi_param CONTENT_TYPE $content_type;
        fastcgi_param CONTENT_LENGTH $content_length;
        fastcgi_param GATEWAY_INTERFACE CGI/1.1;
        fastcgi_param SERVER_SOFTWARE nginx/$nginx_version;
        fastcgi_param REMOTE_ADDR $remote_addr;
        fastcgi_param REMOTE_PORT $remote_port;
        fastcgi_param SERVER_ADDR $server_addr;
        fastcgi_param SERVER_PORT $server_port;
        fastcgi_param SERVER_NAME $server_name;
        fastcgi_param SERVER_PROTOCOL $server_protocol;
        fastcgi_param SCRIPT_FILENAME $fastcgi_script_name;
        fastcgi_param PATH_INFO $fastcgi_script_name;
        fastcgi_pass 127.0.0.1:9002;
}
location /static/ {
        root /path/to/www;
        if (-f $request_filename) {
           rewrite ^/static/(.*)$  /static/$1 break;
        }
    }

啟動一個 fastcgi 的進程

spawn-fcgi -d /path/to/www -f /path/to/www/index.py -a 127.0.0.1 -p 9002

用 web.py 寫的一個小 demo 測試

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import web
urls = ("/.*", "hello")
app = web.application(urls, globals())
class hello:
    def GET(self):
        return 'Hello, world!'
if __name__ == "__main__":
    web.wsgi.runwsgi = lambda func, addr=None: web.wsgi.runfcgi(func, addr)
    app.run()

啟動 nginx

nginx

這樣就 ok 了~

下面開始介紹下 我一般用的方法:

http://wiki.jikexueyuan.com/project/python-actual-combat/images/2.jpg" alt="pic" />

前端 nginx 用負責負載分發(fā):

部署的時候采用了單 IP 多端口方式,服務器有 4 個核心,決定開 4 個端口對應,分別是 8885~8888,修改

upstream backend {
        server 127.0.0.1:8888;
        server 127.0.0.1:8887;
        server 127.0.0.1:8886;
        server 127.0.0.1:8885;
}
 server{
        listen  80;
        server_name message.test.com;
        keepalive_timeout 65;    #
        proxy_read_timeout 2000; #
        sendfile on;
        tcp_nopush on;
        tcp_nodelay on;
    location / {
        proxy_pass_header Server;
        proxy_set_header Host $http_host;
        proxy_redirect off;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Scheme $scheme;
        proxy_pass  http://backend;
        }
}

然后運行四個 python 程序,端口為咱們配置好的端口

我這里用 tornado 寫了一個執(zhí)行系統(tǒng)程序的例子:

import subprocess
import tornado.ioloop
import time
import fcntl
import functools
import os
class GenericSubprocess (object):
    def __init__ ( self, timeout=-1, **popen_args ):
        self.args = dict()
        self.args["stdout"] = subprocess.PIPE
        self.args["stderr"] = subprocess.PIPE
        self.args["close_fds"] = True
        self.args.update(popen_args)
        self.ioloop = None
        self.expiration = None
        self.pipe = None
        self.timeout = timeout
        self.streams = []
        self.has_timed_out = False
    def start(self):
        """Spawn the task.
        Throws RuntimeError if the task was already started."""
        if not self.pipe is None:
            raise RuntimeError("Cannot start task twice")
        self.ioloop = tornado.ioloop.IOLoop.instance()
        if self.timeout > 0:
            self.expiration = self.ioloop.add_timeout( time.time() + self.timeout, self.on_timeout )
        self.pipe = subprocess.Popen(**self.args)
        self.streams = [ (self.pipe.stdout.fileno(), []),
                             (self.pipe.stderr.fileno(), []) ]
        for fd, d in self.streams:
            flags = fcntl.fcntl(fd, fcntl.F_GETFL)| os.O_NDELAY
            fcntl.fcntl( fd, fcntl.F_SETFL, flags)
            self.ioloop.add_handler( fd,
                                     self.stat,
                                     self.ioloop.READ|self.ioloop.ERROR)
    def on_timeout(self):
        self.has_timed_out = True
        self.cancel()
    def cancel (self ) :
        """Cancel task execution
        Sends SIGKILL to the child process."""
        try:
            self.pipe.kill()
        except:
            pass
    def stat( self, *args ):
        '''Check process completion and consume pending I/O data'''
        self.pipe.poll()
        if not self.pipe.returncode is None:
            '''cleanup handlers and timeouts'''
            if not self.expiration is None:
                self.ioloop.remove_timeout(self.expiration)
            for fd, dest in  self.streams:
                self.ioloop.remove_handler(fd)
            '''schedulle callback (first try to read all pending data)'''
            self.ioloop.add_callback(self.on_finish)
        for fd, dest in  self.streams:
            while True:
                try:
                    data = os.read(fd, 4096)
                    if len(data) == 0:
                        break
                    dest.extend([data])
                except:
                    break
    @property
    def stdout(self):
        return self.get_output(0)
    @property
    def stderr(self):
        return self.get_output(1)
    @property
    def status(self):
        return self.pipe.returncode
    def get_output(self, index ):
        return "".join(self.streams[index][1])
    def on_finish(self):
        raise NotImplemented()
class Subprocess (GenericSubprocess):
    """Create new instance
    Arguments:
        callback: method to be called after completion. This method should take 3 arguments: statuscode(int), stdout(str), stderr(str), has_timed_out(boolean)
        timeout: wall time allocated for the process to complete. After this expires Task.cancel is called. A negative timeout value means no limit is set
    The task is not started until start is called. The process will then be spawned using subprocess.Popen(**popen_args). The stdout and stderr are always set to subprocess.PIPE.
    """
    def __init__ ( self, callback, *args, **kwargs):
        """Create new instance
        Arguments:
            callback: method to be called after completion. This method should take 3 arguments: statuscode(int), stdout(str), stderr(str), has_timed_out(boolean)
            timeout: wall time allocated for the process to complete. After this expires Task.cancel is called. A negative timeout value means no limit is set
        The task is not started until start is called. The process will then be spawned using subprocess.Popen(**popen_args). The stdout and stderr are always set to subprocess.PIPE.
        """
        self.callback = callback
        self.done_callback = False
        GenericSubprocess.__init__(self, *args, **kwargs)
    def on_finish(self):
        if not self.done_callback:
            self.done_callback = True
            '''prevent calling callback twice'''
            self.ioloop.add_callback(functools.partial(self.callback, self.status, self.stdout, self.stderr, self.has_timed_out))
if __name__ == "__main__":
    ioloop = tornado.ioloop.IOLoop.instance()
    def print_timeout( status, stdout, stderr, has_timed_out) :
        assert(status!=0)
        assert(has_timed_out)
        print "OK status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out)
    def print_ok( status, stdout, stderr, has_timed_out) :
        assert(status==0)
        assert(not has_timed_out)
        print "OK status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out)
    def print_error( status, stdout, stderr, has_timed_out):
        assert(status!=0)
        assert(not has_timed_out)
        print "OK status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out)
    def stop_test():
        ioloop.stop()
    t1 = Subprocess( print_timeout, timeout=3, args=[ "sleep", "5" ] )
    t2 = Subprocess( print_ok, timeout=3, args=[ "sleep", "1" ] )
    t3 = Subprocess( print_ok, timeout=3, args=[ "sleepdsdasdas", "1" ] )
    t4 = Subprocess( print_error, timeout=3, args=[ "cat", "/etc/sdfsdfsdfsdfsdfsdfsdf" ] )
    t1.start()
    t2.start()
    try:
        t3.start()
        assert(false)
    except:
        print "OK"
    t4.start()
    ioloop.add_timeout(time.time() + 10, stop_test)
    ioloop.start()

大家可以先用 uwsgi,要還是有壓力和堵塞的話,可以用用 nginx 做負載。

我自己的經(jīng)驗來看還是這個靠譜~