鍍金池/ 教程/ Python/ Python 通過 amqp 消息隊列協(xié)議中的 Qpid 實現(xiàn)數(shù)據(jù)通信
通過 memcached 實現(xiàn)領號排隊功能及 python 隊列實例
利用 pypy 提高 python 腳本的執(zhí)行速度及測試性能
Python FAQ3-python 中 的原始(raw)字符串
Mongodb 千萬級數(shù)據(jù)在 python 下的綜合壓力測試及應用探討
Parallel Python 實現(xiàn)程序的并行多 cpu 多核利用【pp 模塊】
python simplejson 模塊淺談
服務端 socket 開發(fā)之多線程和 gevent 框架并發(fā)測試[python 語言]
python Howto 之 logging 模塊
python 之 MySQLdb 庫的使用
關于 python 調用 zabbix api 接口的自動化實例 [結合 saltstack]
python 之利用 PIL 庫實現(xiàn)頁面的圖片驗證碼及縮略圖
Python 通過 amqp 消息隊列協(xié)議中的 Qpid 實現(xiàn)數(shù)據(jù)通信
python 中用 string.maketrans 和 translate 巧妙替換字符串
python linecache 模塊讀取文件用法詳解
Python 批量更新 nginx 配置文件
python 計算文件的行數(shù)和讀取某一行內容的實現(xiàn)方法
python+Django 實現(xiàn) Nagios 自動化添加監(jiān)控項目
多套方案來提高 python web 框架的并發(fā)處理能力
python 寫報警程序中的聲音實現(xiàn) winsound
python 調用 zabbix 的 api 接口添加主機、查詢組、主機、模板
對 Python-memcache 分布式散列和調用的實現(xiàn)
使用 python 構建基于 hadoop 的 mapreduce 日志分析平臺
一個腳本講述 python 語言的基礎規(guī)范,適合初學者
Python 編寫的 socket 服務器和客戶端
如何將 Mac OS X10.9 下的 Python2.7 升級到最新的 Python3.3
python 監(jiān)控文件或目錄變化
報警監(jiān)控平臺擴展功能 url 回調的設計及應用 [python 語言]
Python 處理 cassandra 升級后的回滾腳本
python 實現(xiàn) select 和 epoll 模型 socket 網(wǎng)絡編程
關于 B+tree (附 python 模擬代碼)
通過 python 和 websocket 構建實時通信系統(tǒng)[擴展 saltstack 監(jiān)控]

Python 通過 amqp 消息隊列協(xié)議中的 Qpid 實現(xiàn)數(shù)據(jù)通信

簡介:

這兩天看了消息隊列通信,打算在配置平臺上應用起來。以前用過 zeromq 但是這東西太快了,還有就是 rabbitmq 有點大,新浪的朋友推薦了 qpid,簡單輕便。自己總結了下文檔,大家可以瞅瞅。

AMQP(消息隊列協(xié)議 Advanced Message Queuing Protocol)是一種消息協(xié)議 ,等同于 JMS,但是 JMS 只是 java 平臺的方案,AMQP 是一個跨語言的協(xié)議。

AMQP 不分語言平臺,主流的語言都支持,運維這邊的 perl,python,ruby 更是支持,所以大家就放心用吧。

主流的消息隊列通信類型:

點對點:A 發(fā)消息給 B。
廣播:A 發(fā)給所有其他人的消息
組播:A 發(fā)給多個但不是所有其他人的消息。
Requester/response:類似訪問網(wǎng)頁的通信方式,客戶端發(fā)請求并等待,服務端回復該請求
Pub-sub:類似雜志發(fā)行,出版雜志的人并不知道誰在看這本雜志,訂閱的人并不關心誰在發(fā)表這本雜志。出版的人只管將信息發(fā)布出去,訂閱的人也只在需要的時候收到該信息。
Store-and-forward:存儲轉發(fā)模型類似信件投遞,寫信的人將消息寫給某人,但在將信件發(fā)出的時候,收信的人并不一定在家等待,也并不知道有消息給他。但這個消息不會丟失,會放在收信者的信箱中。這種模型允許信息的異步交換。
其他通信模型。。。

Publisher --->Exchange ---> MessageQueue --->Consumer

整個過程是異步的.Publisher,Consumer 相互不知道對方的存在,Exchange 負責交換/路由,依靠 Routing Key,每個消息者有一個 Routing Key,每個 Binding 將自已感興趣的 RoutingKey 告訴 Exchange,以便 Exchange 將相關的消息轉發(fā)給相應的 Queue!

幾個概念

幾個概念
Producer,Routing Key,Exchange,Binding,Queue,Consumer.
Producer: 消息的創(chuàng)建者,消息的發(fā)送者
Routing Key:唯一用來映射消息該進入哪個隊列的標識
Exchange:負責消息的路由,交換
Binding:定義 Queue 和 Exchange 的映射關系
Queue:消息隊列
Consumer:消息的使用者
Exchange類型
Fan-Out:類似于廣播方式,不管 RoutingKey
Direct:根據(jù) RoutingKey,進行關聯(lián)投寄
Topic:類似于 Direct,但是支持多個 Key 關聯(lián),以組的方式投寄。
      key以.來定義界限。類似于 usea.news,usea.weather.這兩個消息是一組的。

http://wiki.jikexueyuan.com/project/python-actual-combat/images/81.jpg" alt="pic" />

QPID

Qpid 是 Apache 開發(fā)的一款面向對象的消息中間件,它是一個 AMQP 的實現(xiàn),可以和其他符合 AMQP 協(xié)議的系統(tǒng)進行通信。Qpid 提供了 C++/Python/Java/C# 等主流編程語言的客戶端庫,安裝使用非常方便。相對于其他的 AMQP 實現(xiàn),Qpid 社區(qū)十分活躍,有望成為標準 AMQP 中間件產(chǎn)品。除了符合 AMQP 基本要求之外,Qpid 提供了很多額外的 HA 特性,非常適于集群環(huán)境下的消息通信!

基本功能外提供以下特性:

采用 Corosync(?)來保證集群環(huán)境下的 Fault-tolerant(?) 特性
支持 XML 的 Exchange,消息為 XML 時,彩用 Xquery 過濾
支持 plugin
提供安全認證,可對 producer/consumer 提供身份認證
qpidd --port --no-data-dir --auth
port:端口
--no-data-dir:不指定數(shù)據(jù)目錄
--auth:不啟用安全身份認證

啟動后自動創(chuàng)建一些 Exchange,amp.topic,amp.direct,amp.fanout

tools:

Qpid-config:維護 Queue,Exchange,內部配置 Qpid-route:配置 broker Federation(聯(lián)盟?集群?) Qpid-tool:監(jiān)控

咱們說完介紹了,這里就趕緊測試下。

服務器端的安裝:

yum install qpid-cpp-server
yum install qpid-tools
/etc/init.d/qpidd start

發(fā)布端的測試代碼:

http://wiki.jikexueyuan.com/project/python-actual-combat/images/82.jpg" alt="pic" />

一些測試代碼轉自: ibm 的開發(fā)社區(qū)

#!/usr/bin/env python
#xiaorui.cc
#轉自 ibm 開發(fā)社區(qū)
import optparse, time
from qpid.messaging import *
from qpid.util import URL
from qpid.log import enable, DEBUG, WARN
def nameval(st):
  idx = st.find("=")
  if idx >= 0:
    name = st[0:idx]
    value = st[idx+1:]
  else:
    name = st
    value = None
  return name, value
parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]",
                               description="Send messages to the supplied address.")
parser.add_option("-b", "--broker", default="localhost",
                  help="connect to specified BROKER (default %default)")
parser.add_option("-r", "--reconnect", action="store_true",
                  help="enable auto reconnect")
parser.add_option("-i", "--reconnect-interval", type="float", default=3,
                  help="interval between reconnect attempts")
parser.add_option("-l", "--reconnect-limit", type="int",
                  help="maximum number of reconnect attempts")
parser.add_option("-c", "--count", type="int", default=1,
                  help="stop after count messages have been sent, zero disables (default %default)")
parser.add_option("-t", "--timeout", type="float", default=None,
                  help="exit after the specified time")
parser.add_option("-I", "--id", help="use the supplied id instead of generating one")
parser.add_option("-S", "--subject", help="specify a subject")
parser.add_option("-R", "--reply-to", help="specify reply-to address")
parser.add_option("-P", "--property", dest="properties", action="append", default=[],
                  metavar="NAME=VALUE", help="specify message property")
parser.add_option("-M", "--map", dest="entries", action="append", default=[],
                  metavar="KEY=VALUE",
                  help="specify map entry for message body")
parser.add_option("-v", dest="verbose", action="store_true",
                  help="enable logging")
opts, args = parser.parse_args()
if opts.verbose:
  enable("qpid", DEBUG)
else:
  enable("qpid", WARN)
if opts.id is None:
  spout_id = str(uuid4())
else:
  spout_id = opts.id
if args:
  addr = args.pop(0)
else:
  parser.error("address is required")
content = None
if args:
  text = " ".join(args)
else:
  text = None
if opts.entries:
  content = {}
  if text:
    content["text"] = text
  for e in opts.entries:
    name, val = nameval(e)
    content[name] = val
else:
  content = text
conn = Connection(opts.broker,
                  reconnect=opts.reconnect,
                  reconnect_interval=opts.reconnect_interval,
                  reconnect_limit=opts.reconnect_limit)
try:
  conn.open()
  ssn = conn.session()
  snd = ssn.sender(addr)
  count = 0
  start = time.time()
  while (opts.count == 0 or count < opts.count) and \
        (opts.timeout is None or time.time() - start < opts.timeout):
    msg = Message(subject=opts.subject,
                  reply_to=opts.reply_to,
                  content=content)
    msg.properties["spout-id"] = "%s:%s" % (spout_id, count)
    for p in opts.properties:
      name, val = nameval(p)
      msg.properties[name] = val
    snd.send(msg)
    count += 1
    print msg
except SendError, e:
  print e
except KeyboardInterrupt:
  pass
conn.close()

客戶端的測試代碼:

http://wiki.jikexueyuan.com/project/python-actual-combat/images/83.jpg" alt="pic" />

#!/usr/bin/env python
#xiaorui.cc
##轉自 ibm 開發(fā)社區(qū)
import optparse
from qpid.messaging import *
from qpid.util import URL
from qpid.log import enable, DEBUG, WARN
parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...",
                               description="Drain messages from the supplied address.")
parser.add_option("-b", "--broker", default="localhost",
                  help="connect to specified BROKER (default %default)")
parser.add_option("-c", "--count", type="int",
                  help="number of messages to drain")
parser.add_option("-f", "--forever", action="store_true",
                  help="ignore timeout and wait forever")
parser.add_option("-r", "--reconnect", action="store_true",
                  help="enable auto reconnect")
parser.add_option("-i", "--reconnect-interval", type="float", default=3,
                  help="interval between reconnect attempts")
parser.add_option("-l", "--reconnect-limit", type="int",
                  help="maximum number of reconnect attempts")
parser.add_option("-t", "--timeout", type="float", default=0,
                  help="timeout in seconds to wait before exiting (default %default)")
parser.add_option("-p", "--print", dest="format", default="%(M)s",
                  help="format string for printing messages (default %default)")
parser.add_option("-v", dest="verbose", action="store_true",
                  help="enable logging")
opts, args = parser.parse_args()
if opts.verbose:
  enable("qpid", DEBUG)
else:
  enable("qpid", WARN)
if args:
  addr = args.pop(0)
else:
  parser.error("address is required")
if opts.forever:
  timeout = None
else:
  timeout = opts.timeout
class Formatter:
  def __init__(self, message):
    self.message = message
    self.environ = {"M": self.message,
                    "P": self.message.properties,
                    "C": self.message.content}
  def __getitem__(self, st):
    return eval(st, self.environ)
conn = Connection(opts.broker,
                  reconnect=opts.reconnect,
                  reconnect_interval=opts.reconnect_interval,
                  reconnect_limit=opts.reconnect_limit)
try:
  conn.open()
  ssn = conn.session()
  rcv = ssn.receiver(addr)
  count = 0
  while not opts.count or count < opts.count:
    try:
      msg = rcv.fetch(timeout=timeout)
      print opts.format % Formatter(msg)
      count += 1
      ssn.acknowledge()
    except Empty:
      break
except ReceiverError, e:
  print e
except KeyboardInterrupt:
  pass
conn.close()

Browse 模式的意思是,瀏覽的意思,一個特殊的需求,我訪問了一次,別人也能訪問。

Consume 模式的意思是,我瀏覽了一次后,刪除這一條。別人就訪問不到啦。

這個是瀏覽模式:

http://wiki.jikexueyuan.com/project/python-actual-combat/images/84.jpg" alt="pic" />

sub-pub 通信的例子

Pub-sub 是另一種很有用的通信模型??峙滤拿志驮从诔霭姘l(fā)行這種現(xiàn)實中的信息傳遞方式吧,publisher 就是出版商,subscriber 就是訂閱者。

服務端
qpid-config add exchange topic news-service
./spout news-service/news xiaorui.cc
客戶端:
./drain -t 120 news-service/#.news

PUB 端,創(chuàng)建 TOPIC 點!

http://wiki.jikexueyuan.com/project/python-actual-combat/images/85.jpg" alt="pic" />

SUB端,也就是接收端!

http://wiki.jikexueyuan.com/project/python-actual-combat/images/86.jpg" alt="pic" />

總結:

qpid 挺好用的,比 rabbitmq 要輕型,比 zeromq 保險點! 各方面的文檔也都很健全,值得一用。話說,這三個消息隊列我也都用過,最一開始用的是 redis 的 pubsub 做日志收集和信息通知,后來在做集群相關的項目的時候,我自己搞了一套 zeromq 的分布式任務分發(fā),和 saltstack 很像,當然了遠沒有萬人用的 salt 強大。 rabbitmq 的用法,更是看中他的安全和持久化,當然性能真的不咋地。
關于 qpid 的性能我沒有親自做大量的測試,但是聽朋友說,加持久化可以到 7k,不加持久化可以到 1500 左右。

本文出自 “峰云,就她了?!?博客,謝絕轉載!