|
#
系統(tǒng)中,用戶的消息在移動(dòng)設(shè)備與接入服務(wù)器建立的Tcp長連接上傳遞。這些消息包括:文本,復(fù)合文本,位置信息,音頻剪輯,圖像等等。 發(fā)送者傳送消息到平臺系統(tǒng)內(nèi)部并將消息寫入gridfs,待接收者上線時(shí)平臺將消息推送至接收者。 考慮到帶寬利用,接收者得到的消息將不包含二進(jìn)制數(shù)據(jù),例如: 音頻,圖像等等。 這要求接收者對平臺發(fā)起一次獲取消息包內(nèi)指定的音頻和圖像數(shù)據(jù)的請求。 移動(dòng)端向平臺請求二進(jìn)制數(shù)據(jù)的情況還包含 【離線文件傳送】場景 。 二進(jìn)制數(shù)據(jù)往往是指那些數(shù)據(jù)量比較大的對象,這些對象在移動(dòng)兩端交換時(shí),交互通道將不占用與接入服務(wù)器的連接通道,而是通過nginx傳送到平臺內(nèi)部; 同樣接收者獲取二進(jìn)制數(shù)據(jù)也是通過nginx獲取。這種請求是HTTP的。 這里整理的是如何在平臺部署 【負(fù)載均衡的集群的分布式的文件服務(wù)】 nginx : http服務(wù),提供反向代理和負(fù)載均衡服務(wù)(集群可用DNS或考慮LVS方案) mongodb+gridfs : 用于文件服務(wù)提供,其內(nèi)置gridfs提供了分布式,海量存儲的方案 gevent+webpy : nginx直接讀取gridfs是不合適的,配置了cgi才能完成特定功能,這里使用webpy,比django更輕更好用。 webpy的作用是接收到上傳和下傳文件的請求,讀寫gridfs文件內(nèi)容給移動(dòng)端。 gevent是高效的通信框架,雖然單線程工作,但性能非常的好; 用好gevent關(guān)鍵在與外部的io必須全部都是異步的,例如: 數(shù)據(jù)庫,文件磁盤訪問等等。 mongodb對gevent已經(jīng)支持,gevent對webpy,django,psycopg2支持也相當(dāng)?shù)暮茫砸峁﹚ebservice服務(wù)那就考慮用gevent+webpy或django把,性能是杠杠的,比 apache+mod_wsgi要好很多 ,而且gevent是進(jìn)程內(nèi)的不同的HTTP REQUEST可以是共享數(shù)據(jù)的,這一點(diǎn)非常誘惑(apache+mod_wsgi的REQUEST可是隔離的哦!除非您通過redis的PUB/SUB實(shí)現(xiàn)兩個(gè)REQUEST的通信) 關(guān)注的問題: 1.下傳大文件時(shí)的處理 如果直接用nginx當(dāng)然沒有這個(gè)問題 ,但用webpy讀取文件返回HttpResponse時(shí)問題來了,總不至于讀取整個(gè)文件,然后再return。 這種方式在php有flush方法,python只能用yield來做 2.上傳大文件時(shí)的處理 當(dāng)接收到http的文件POST請求時(shí),文件已經(jīng)全部緩存到web服務(wù)器,如果同時(shí)幾千個(gè)文件上傳在進(jìn)行,服務(wù)器就會(huì)被擠爆,這也是很多網(wǎng)站不允許大文件上傳的緣故吧。關(guān)于這個(gè)問題,我想就需要修改一下webpy關(guān)于文件上傳的處理代碼了,將接收到的文件數(shù)據(jù)以流的形式寫入到gridfs里去作為臨時(shí)文件被緩存,等完全接收文件時(shí),才通知到handler代碼,這樣必定高效很多(新的問題又來了,會(huì)不會(huì)把gridfs搞爆掉! 處理時(shí)考慮延時(shí)緩存提交gridfs把)。 BUF_SIZE = 262144 class download: def GET(self): file_name = 'file_name' file_path = os.path.join('file_path', file_name) f = None try: f = open(file_path, "rb") webpy.header('Content-Type','application/octet-stream') webpy.header('Content-disposition', 'attachment; filename=%s.dat' % file_name) while True: c = f.read(BUF_SIZE) if c: yield c else: break except Exception, e: print e yield 'Error' finally: if f: f.close()
links: http://api.mongodb.org/python http://webpy.org/cookbook/storeupload.zh-cn http://webpy.org/cookbook/streaming_large_files http://gevent.org 下份代碼 demo很值得看哦 gevent 1.0 由libev 替換了libevent
摘要: 貼代碼 Code highlighting produced by Actipro CodeHighlighter (freeware)http://www.CodeHighlighter.com/--> 1 #--coding:utf-8-- 2 3 ... 閱讀全文
多樣的文本消息 ----------------- struct MimeText_t{ int type; string text; }; MimeText_t 可以包含普通的文本、圖像和音頻文件的id 圖像和音頻數(shù)據(jù)發(fā)送到服務(wù)器,服務(wù)器并不直接將數(shù)據(jù)發(fā)送到接收者,而是發(fā)送 音頻和圖像的描述uri信息 接收者解釋json,顯示text文本,讀取emoticon編號,顯示表情圖片; image,audio則顯示占位(如果當(dāng)前wifi可用,則自己自動(dòng)加載image和audio資源) ,如果非wifi信號則待用戶點(diǎn)擊此占位,然后從服務(wù)器請求image和audio資源到本地。 文本描述: 字體大小,顏色,文本link,表情符號 文本用json組織 , { set:[ text:{text:'this is',bg-color:#ff0000,color:#ffffff,font-name:'arial',font-size:20,bold:true,italic:true}, text:{text:'shanghai',color:#ff0000,font-name:'arial',font-size:20,bold:true,italic:true,link:'http://sw2us.com/images/shanghai.png'}, image:{id:1001,width:200,height:200,uri:'http://sw2us.com/images/bear.png'}, audio:{id:2001,duration:5,uri:'http://sw2us.com/clips/a001.mp3'}, location:{lon:121.221,lat,time,speed,direction,text:'立月路2001號浦星公路口'}, emoticon:{id:201} ], } 屬性名簡化: --------------------- ----------------------- 1 - text [ 1: text , 2: bg-color , 3: color , 4: font-name, 5:font-size, 6:bold, 7:italic ] 2 - image [ 1: id , 2:width , 3:height , 4:uri] 3 - audio [ 1:id , 2:duration,3:uri] 4 - location [ 1:lon, 2:lat, 3:time, 4:speed, 5:direction, 6:text] 5 - emoticon [ 1: id ] ----------------------- 0 - false 1 - true
接口定義: 1 interface IAuthServer{ 2 CallReturn_t userAuth(string user,string passwd,int device_type); 3 CallReturn_t registerUser(UserRegisterInfo_t reginfo); // tested 4 }; 定義認(rèn)證服務(wù)器接口,userAuth()返回認(rèn)證用戶的token 接口服務(wù)實(shí)現(xiàn): 1 import os,os.path,sys,struct,time,traceback,signal,threading,copy,base64 2 import datetime,base64 3 4 from datetime import datetime 5 from base import * 6 import tcelib as tce 7 from showbox import * 8 import utils.misc 9 import utils.config 10 import utils.cipher 11 12 13 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "database.showbox.showbox.settings") 14 15 from django.db import connection 16 from django.db.models import Sum 17 from django.db import transaction 18 # import sns.core.models as cm 19 import database.showbox.core.models as core 20 import database.showbox.nosql.models as nosql 21 22 class AuthServerImpl(IAuthServer): 23 def __init__(self,app): 24 IAuthServer.__init__(self) 25 self.app = app 26 27 def userAuth(self, user, passwd, device_type, ctx): 28 cr = CallReturn_t() 29 try: 30 r = core.User.objects.get(user=user,passwd=passwd) 31 userinfo = { 32 "id":r.id, 33 "user":user, 34 "name":r.name, 35 "login_time":int(time.time()), 36 "user_type":SnsConsts.Authorized_User 37 } 38 token = utils.cipher.encryptToken(userinfo) 39 cr.value = token 40 except: 41 print traceback.format_exc() 42 cr = CallReturn_Error() 43 return cr 44 45 def registerUser(self, reginfo, ctx): 46 return IAuthServer.registerUser(self, reginfo, ctx) 47 48 49 50 class ServerApp: 51 def __init__(self): 52 pass 53 54 def getConfig(self): 55 #return self.app.getConfig() 56 pass 57 58 _handle = None 59 @classmethod 60 def instance(cls): 61 if cls._handle == None: 62 cls._handle = cls() 63 return cls._handle 64 65 def run(self): 66 tce.RpcCommunicator.instance().init('authserver').initMessageRoute('./services.xml') 67 server = tce.RpcCommunicator.instance().currentServer().findEndPointByName('mq_authserver').impl 68 adapter = tce.RpcAdapterEasyMQ.create('server',server) 69 #沒有主動(dòng)發(fā)送消息的情形 70 servant = AuthServerImpl(self) 71 adapter.addServant(servant) 72 tce.RpcCommunicator.instance().waitForShutdown() 73 74 if __name__ == '__main__': 75 ServerApp.instance().run() 服務(wù)器很簡單,實(shí)現(xiàn)接口IAuthService的功能函數(shù),定義一個(gè)ServerApp,然后運(yùn)行 客戶調(diào)用測試: 1 from datetime import datetime 2 from base import * 3 import tcelib as tce 4 from showbox import * 5 import utils.misc 6 import utils.config 7 import utils.cipher 8 9 10 def userAuthResult(result,prx): 11 print result 12 13 # queue:client 必須在調(diào)用服務(wù)器的write 隊(duì)列mq 14 communicator =tce.RpcCommunicator.instance().init() 15 conn = tce.RpcConnectionEasyMQ.create('127.0.0.1',12301,'queue:mq_authserver') 16 local = tce.RpcConnectionEasyMQ.create('127.0.0.1',12301,'queue:mq_test_client',tce.AF_READ) 17 conn.setLoopbackMQ(local) 18 19 20 prx = IAuthServerPrx(conn) 21 prx.userAuth_async('test','111111',1,userAuthResult) #異步調(diào)用 22 print prx.userAuth('test','111111',1) #同步調(diào)用 21,22行分別測試兩種調(diào)用模式 client與server通過EasyMQ進(jìn)行傳遞 easyMQ是個(gè)最簡單的消息隊(duì)列實(shí)現(xiàn)
在tce構(gòu)架的平臺系統(tǒng)中,采集的用戶位置gps信息從網(wǎng)關(guān)gatewayserver接收并通過mq_gps消息隊(duì)列存儲到多個(gè)位置單元服務(wù)器 LocationUnit, 系統(tǒng)中存在若干個(gè)LocationServer提供查詢功能,當(dāng)一次位置查詢時(shí),LocationServer對集群的LocationUnit進(jìn)行Map Reduce計(jì)算
idl的保留關(guān)鍵字:'byte','bool','short','int','long','float','double','string' ,均不能用于定義module,class,interface和變量名稱 定義的變量名稱如果包含以下單詞:'def','import','from','type','str','int','float','class' , tce生成python代碼時(shí)自動(dòng)給添加'_'后綴,比如: struct xx{ string name; string from; } xx結(jié)構(gòu)的from變量名將生成from_ 接口定義: module test{ dictionary<string,string> Properties_t; sequence<string> IpAddressList_t; interface ITerminal{ void onGetServerMessage(string text); } interface Server{ IpAddressList_t getIpAddresses(); Properties_t getProperties(); void ping(string fromhost); string login(string user,string passwd,ctx); }; } struct: tce將結(jié)構(gòu)struct映射為class對象 ,初始化成員變量并創(chuàng)建散列函數(shù) marshall/unmarshall sequence<T>: tce將數(shù)組類型直接映射為[] 例如 : dictionary<K,V> tce將字典映射為 {} python實(shí)現(xiàn)Server接口的getIpAddresses()方法: def getIpAddresses(): return ['192.168.14.101','192.168.12.50'] 定義服務(wù)器接口實(shí)現(xiàn): tce為interface生成接口基類: class Server 我們提供一個(gè)實(shí)現(xiàn)類 : class ServerImpl(Server): def __init__(self): Server.__init__(self) def getIpAddresses(self,ctx): return [] 在這里我們提供了ServerImpl類,然后編寫實(shí)現(xiàn)函數(shù)getIpAddresses. 每個(gè)接口函數(shù)都攜帶ctx參數(shù),ctx攜帶rpc請求的附屬信息,比如: 外帶數(shù)據(jù)(dict),底部的連接對象 等等 。 服務(wù)接口被稱為一個(gè)服務(wù)類servant ,接下來演示如何將這個(gè)servant裝配并提供客戶。 tce.RpcCommunicator.instance().init() ep = tce.RpcEndPoint(host='127.0.0.1',port=16005) 定義一個(gè)通信端點(diǎn) adapter = tce.RpcCommunicator.instance().createAdapter('first_server',ep) 創(chuàng)建一個(gè)通信適配器 servant = ServerImpl() 創(chuàng)建服務(wù)接口對象 adapter.addServant(servant) 添加進(jìn)適配器 tce.RpcCommunicator.instance().waitForShutdown() 進(jìn)入通信循環(huán) 調(diào)用服務(wù): tce.RpcCommunicator.instance().init() prx = test.ServerProxy.create(127.0.0.1,16005) ips = prx.getIpAddresses() 多種呼叫模式: tce將接口函數(shù)自動(dòng)生成 normal,oneway,async三種調(diào)用接口方法 ,rpc調(diào)用出現(xiàn)異常,底部將拋出異常,所以用戶需要異常捕獲。 1.normal: 原型: fun_name(參數(shù)..,timeout=0,extra=None) 調(diào)用函數(shù)自動(dòng)添加timeout,extra參數(shù)。timeout默認(rèn)為0,將自動(dòng)采用tce默認(rèn)的30s等待調(diào)用返回時(shí)間; extra 指此次調(diào)用攜帶的附屬數(shù)據(jù),extra ={'name':'scott','age':100} extra數(shù)據(jù)在服務(wù)端接口函數(shù)的ctx中獲取: ctx.msg.extra 函數(shù)調(diào)用時(shí)將阻塞客戶線程,直到timeout超時(shí)或者服務(wù)器數(shù)據(jù)返回 2. oneway fun_name_oneway(參數(shù)...,extra=None) 只有類型void的接口函數(shù)才會(huì)生成oneway調(diào)用方法.oneway調(diào)用不會(huì)阻塞用戶線程,通常用于單向傳輸?shù)膱鼍埃?nbsp;Server接口的ping()函數(shù) 3. async fun_name_async(參數(shù),async_callback,extra=None) 異步調(diào)用模式不會(huì)阻塞客戶線程,async_callback指定了rpc調(diào)用的返回接收函數(shù) 接收函數(shù)原型: void fun_name_CallBack(result,proxy) 例如: def getIpAddressesResult(result,proxy): print result #result - IpAddressList_t prx.getIpAddresses_async(getIpAddressesResult) *連接復(fù)用 在互聯(lián)網(wǎng)應(yīng)用場景,服務(wù)器將接入大量的客戶端設(shè)備,客戶端是不能被尋址,所以服務(wù)器要完成推送消息給客戶端,必須在客戶端建立的連接上反向傳輸。 tce使這個(gè)工作變得相當(dāng)簡單: 1. 客戶端定義接收消息的接口 ITerminal,定義接收函數(shù)onGetServerMessage() class TermnialImpl(ITerminal): ... 2. 創(chuàng)建到服務(wù)器的連接代理 tce.RpcCommunicator.instance().init() 3. 添加服務(wù)類實(shí)現(xiàn) adapter = tce.RpcCommAdapter('adapter') impl = TerminalImpl() adapter.addConnection(prx.conn) adapter.addServant(impl) 加到通信器對象 3. 請求一次調(diào)用 prx.login('scott','1234') 4. 服務(wù)器端反向調(diào)用ITerminal的onGetServerMessage() def login(self,user,passwd,ctx): prx = ITerminalProxy(ctx.conn) prx.onGetServerMessage('server message..') 完成一次對設(shè)備端的接口調(diào)用
同樣在函數(shù)中連接pgsql,然后執(zhí)行500次查詢, 測試gevent模式、串行查詢、多線程查詢 數(shù)據(jù)如下: multithread_test cost time: 2.45199990273 normal_test cost time: 4.04299998283 gevent_test cost time: 2.12800002098 結(jié)果 串行最慢4.4s, 多線程 2.45s ,gevent最快2.12 ,yes! 測試代碼: 1 import gevent 2 import gevent.queue 3 4 import psycopg2 5 import psycopg2.extensions 6 7 import psycogreen.gevent 8 9 psycogreen.gevent.patch_psycopg() 10 11 sys.path.insert(0,'../') 12 13 import easymq 14 15 ''' 16 在同一線程中,同一個(gè)連接conn上兩次創(chuàng)建的cur將會(huì)是一樣滴,因?yàn)槭钱惒絯ait_read()緣故 17 所以要么每次創(chuàng)建數(shù)據(jù)庫連接,要么使用dbpool 18 ''' 19 20 21 def readThread(): 22 conn = psycopg2.connect(database='postgres',user='postgres',password='111111') 23 24 # cur = conn.cursor(cursor_factory=psycopg2.extensions.DictCursor) 25 cur = conn.cursor(cursor_factory=psycopg2.extensions.cursor) 26 27 # cur.execute("select pg_sleep(%s)", (2,)) 28 for n in range(10): 29 cur.execute("select CURRENT_DATE") 30 # print cur.fetchone() 31 # print 'read end..' 32 conn = None 33 34 35 def gevent_test(): 36 jobs=[] 37 for n in range(100): 38 jobs.append(gevent.spawn(readThread)) 39 gevent.joinall(jobs) 40 41 def normal_test(): 42 for n in range(100): 43 readThread() 44 45 def multithread_test(): 46 threads=[] 47 for n in range(100): 48 thread = threading.Thread(target=readThread) 49 threads.append(thread) 50 thread.start() 51 for thread in threads: 52 thread.join() 53 54 start = time.time() 55 normal_test() 56 end = time.time() 57 print 'normal_test cost time:',end-start 58 59 start = time.time() 60 gevent_test() 61 end = time.time() 62 print 'gevent_test cost time:',end-start 63 64 # start = time.time() 65 # multithread_test() 66 # end = time.time() 67 # print 'multithread_test cost time:',end-start 68
搞了這么久的RPC通信框架TCE,完成java,c++,python,javascript,actionscript之間的互相調(diào)來調(diào)去,感覺很舒服。
作為移動(dòng)應(yīng)用平臺,海量并發(fā)和高效傳輸是首要考慮要點(diǎn)。 市面上充值著都差不多的解決技術(shù)方案,無非那些 webserver+db ngnix+webserver+mq+logic-server ngnix+gevent-wsgi+db webapi已經(jīng)被高舉到不可超越的地步
而我,不走尋常路,我得另辟捷徑 -http的效率根本無法跟socket的長連接媲美 -服務(wù)器是需要反向推送消息到移動(dòng)設(shè)備的 -操作接口是簡單的易擴(kuò)展的,屏蔽掉通信細(xì)節(jié) -支持htm5的websocket,支持java,支持python,支持python客戶端調(diào)用
那我的方案是tce為基礎(chǔ)的RPC框架平臺,拋棄那些xmls,json,讓開發(fā)者從無盡的網(wǎng)絡(luò)編解碼工作中脫離出來,不用考慮多種通信模式,同步和異步。 font-gate : 前端接入服務(wù)器 easymq : 平臺服務(wù)總線消息隊(duì)列 logic-service : 不同的邏輯服務(wù)器
設(shè)想,在android手機(jī)上java代碼調(diào)用函數(shù) whats_yourname(), 這個(gè)函數(shù)并不在本地,而是存在遠(yuǎn)端平臺內(nèi)部的一個(gè)服務(wù)器上,調(diào)用并被執(zhí)行返回'scott'到手機(jī)終端,這是多么令人快樂的事情,用戶不用關(guān)心消息如何被列集,如何被分派,這一切都是透明的。 同樣,服務(wù)器主動(dòng)推送商品打折信息到手機(jī)上,服務(wù)器僅僅需要調(diào)用手機(jī)接收函數(shù),并填寫要傳輸?shù)膮?shù)即可。 其實(shí),這些就是RPC的實(shí)現(xiàn),這樣的東東到處都是,DCOM,CORBA,ICE,只是我做得更加靈活
總是想做些令人輕松并快樂的事情!
gevent作為一款優(yōu)秀的網(wǎng)絡(luò)通信框架,其出色的性能得到大家一致認(rèn)可,但在處理并行任務(wù)的時(shí)候也要注意很多問題,不然您的服務(wù)器將變得異常緩慢。 http://blog.163.com/lxl_1995/blog/static/677173392012724103742746/ 這篇博文講的非常清楚,建議讀一下 gevent的特點(diǎn)如下: 1. 單線程執(zhí)行,所有協(xié)程都在同一進(jìn)程中被模擬和調(diào)度分派 2. 可以創(chuàng)建成千上萬的 協(xié)程,而不會(huì)受任何性能影響 3. 由于spawn的協(xié)程不是os分配和管理,所以不會(huì)有額外的線程資源分配,cpu也不用在這些線程之間調(diào)度切換 4. 單線程執(zhí)行,無需考慮資源互斥 5. 協(xié)程之間切換是通過gevent的io阻塞完成,例如 gevent.sleep(0), queue.get/put,event,socket.... 每調(diào)用一次gevent 的api,gevent就能獲得一次schedule的機(jī)會(huì)(這很類似操作系統(tǒng)的用戶調(diào)用中斷,由用戶態(tài)切換到內(nèi)核態(tài)) 以上特點(diǎn)保證gevent的性能非常出色,但當(dāng)我們的server用到第三方軟件包的時(shí)候那要非常小心了,特別是這些包內(nèi)部涉及了io操作。 如果第三方軟件包是純python的那很簡單,只需要gevent.monkey_patch(xxx)就okay; 但如果包是擴(kuò)展clib,那要當(dāng)心了,monkey_patch 并不能將其相關(guān)io操作打上補(bǔ)丁,為了使用這些第三方軟件包,要求這些軟件包必須支持 協(xié)程異步 接口(調(diào)用其同步io接口,將阻塞住gevent的執(zhí)行線程,那gevent就完蛋了)。 gevent的patch對psycopg2無效,因?yàn)閜sycopg2的通信部分是c接口的函數(shù)庫,還好psycopg2內(nèi)部支持協(xié)程,需要使用 到 psycogreen 這個(gè)東東 psycogreen.gevent.patch_psycopg() 支持協(xié)程
之后的在gevent的線程中執(zhí)行sql并等待數(shù)據(jù)返回時(shí),gevent立馬將執(zhí)行切換到另外的線程 gvent項(xiàng)目中會(huì)用到各種諸多的第三方庫,必須要求這些庫的io接口不能是阻塞的,也就是能支持到gevent異步模式 應(yīng)用邏輯代碼在被執(zhí)行時(shí)(無系統(tǒng)api呼叫),單線程比多線程執(zhí)行速度更快。循環(huán)執(zhí)行一段計(jì)算二次函數(shù)代碼,由于期間沒有系統(tǒng)api調(diào)用,os不能進(jìn)行內(nèi)核tasklet切換,所以導(dǎo)致cpu的峰值可以攀升到90%,直到硬件、時(shí)鐘等中斷產(chǎn)生,強(qiáng)行切換到其他線程。 多核心cpu表現(xiàn)為單個(gè)核始終異常的忙碌,其他幾個(gè)比較空閑。
easymq 用于替代qpid的消息中間件。通信基礎(chǔ)采用tce引擎,提供topic和queue兩種隊(duì)列。 mq服務(wù)器啟動(dòng)加載mq條目,建立mq內(nèi)存對象,提供認(rèn)證,客戶程序連接時(shí)指定mq名稱和認(rèn)證口令, 管理程序可以動(dòng)態(tài)增加、刪除和監(jiān)視隊(duì)列。 mq持久化支持,根據(jù)創(chuàng)建參數(shù)控制durable。 easymq第一個(gè)版利用可以用python實(shí)現(xiàn),之后考慮資源利用和系統(tǒng)會(huì)用c++實(shí)現(xiàn) easymq是tce一個(gè)很好的應(yīng)用。 定位夠輕,夠簡單,暫不考慮負(fù)載均橫和自動(dòng)路由。 實(shí)例化mq服務(wù)器 1 def start(self): 2 tce.RpcCommunicator.instance().init('easymq.server') 3 ep = tce.RpcEndPoint(host=self.default_host,port=self.default_port) 4 adapter = tce.RpcCommunicator.instance().createAdapter('first',ep) 5 servant = self 6 adapter.addServant(servant) 7 print 'wait for shutdown..' 8 tce.RpcCommunicator.instance().waitForShutdown() 1 server = Server.instance() 2 print 'easymq server launched..' 3 server.init().start() 接收消息 1 import easymq 2 3 def readThread(conn): 4 while True: 5 m = conn.read( ) 6 print 'got one:',m 7 8 if __name__=='__main__': 9 easymq.init() 10 conn = easymq.Connection(('127.0.0.1',12301),'test',mode=easymq.READWRITE) 11 conn.open() 12 readThread(conn) 發(fā)送消息到接收者 1 import easymq 2 3 if __name__=='__main__': 4 easymq.init() 5 6 conn = easymq.Connection(('127.0.0.1',12301),'test',mode=easymq.WRITE) 7 conn.open() 8 for n in range(100): 9 conn.write(str(n)*10) 10 # waitForShutdown() 11 gevent.sleep(2)
|