• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            socketref,再見!高德

            https://github.com/adoggie

              C++博客 :: 首頁 :: 聯(lián)系 :: 聚合  :: 管理
              246 Posts :: 4 Stories :: 312 Comments :: 0 Trackbacks

            常用鏈接

            留言簿(54)

            我參與的團(tuán)隊(duì)

            搜索

            •  

            最新評(píng)論

            閱讀排行榜

            評(píng)論排行榜

            耗時(shí)1天,根據(jù)公司應(yīng)用需求,開發(fā)一種簡易的基礎(chǔ)的通信框架,簡化系統(tǒng)其它模塊在網(wǎng)絡(luò)通信工作方面的復(fù)雜度
            簡單測試network.py service 做服務(wù)器,network.py client做客戶端,傳送多個(gè)消息報(bào)文,且能響應(yīng)sock連接和斷開狀態(tài)
            考慮到性能和實(shí)際項(xiàng)目對(duì)線程需求,故都采用單連接單線程模式,預(yù)留select多路復(fù)用接口,可見: service.selectIn()


              1 # -- coding:utf-8 --
              2 # network.py 
              3 # bin.zhang@sw2us.com 
              4 # 2012.3.8
              5 # revision:  
              6 #
              7 
              8 import socket,traceback,os,os.path,sys,time,struct,base64,gzip,array,threading
              9 import select
             10 
             11 import message
             12 from message import * 
             13 
             14 global_compress_type = COMPRESS_ZLIB
             15     
             16 NETMSG_ERROR_MAGIC = 1
             17 NETMSG_ERROR_SIZE = 2
             18 NETMSG_ERROR_DECOMPRESS =3
             19 NETMSG_ERROR_NOTSUPPORTCOMPRESS = 4
             20 NETMSG_ERROR_MESSAGEUNMARSHALL = 5
             21         
             22 #NetPacketQueue 處理消息分解
             23 class NetPacketQueue:
             24     def __init__(self,conn = None,size= 1024):
             25         self.size = size
             26         self.outs={}
             27         self.ins={}
             28         self.user=None
             29         self.conn = conn
             30         self.bf=''
             31         self.pktlist=[] #解出來的消息
             32         self.mtxptks=threading.Condition()
             33         self.invalid = False 
             34     
             35     def clearPackets(self):
             36         self.mtxptks.acquire()
             37         self.pktlist=[]
             38         self.mtxptks.release()
             39         
             40     def destroy(self):
             41         self.invalid = True
             42         self.mtxptks.acquire()
             43         self.mtxptks.notify()
             44         self.mtxptks.release()
             45 
             46 
             47     # def getMessage(self):
             48         # m = None
             49         # if self.invalid: 
             50             # return m            
             51         # self.mtxptks.acquire()
             52         # if self.pktlist:
             53             # m = self.pktlist[0]
             54             # del self.pktlist[0]
             55         # else:
             56             # self.mtxptks.wait(0.5)
             57             # if self.pktlist:
             58                 # m = self.pktlist[0]
             59                 # del self.pktlist[0]
             60         # self.mtxptks.release()
             61         # return m
             62 
             63     def getMessageList(self):
             64         m =[]        
             65         self.mtxptks.acquire()
             66         m = self.pktlist
             67         self.pktlist=[]
             68         self.mtxptks.release()
             69         return m
             70         
             71     '''
             72         @return: false - 臟數(shù)據(jù)產(chǎn)生 
             73     '''
             74     def dataQueueIn(self,d):
             75         rc = (True,2)
             76         self.bf+=d
             77         d = self.bf
             78         while True:            
             79             hdrsize = NetMetaPacket.minSize()
             80             #print hdrsize,len(d)
             81             if len(d)<NetMetaPacket.minSize():                
             82                 rc = True,0 #數(shù)據(jù)不夠,等待
             83                 break
             84             magic,size,compress,encrypt,ver = struct.unpack('!IIBBI',d[:hdrsize])
             85             if magic != NetMetaPacket.magic4:
             86                 return False, NETMSG_ERROR_MAGIC#
             87             if size<=10:
             88                 return False,NETMSG_ERROR_SIZE
             89             if len(d)< size+4:
             90                 rc = True,1 #數(shù)據(jù)不夠
             91                 break
             92             size-=10
             93             #print size,compress,encrypt,ver
             94             s = d[hdrsize:hdrsize+size]
             95             d = d[hdrsize+size:]
             96             if compress == message.COMPRESS_ZLIB:
             97                 try
             98                     s = zlib.decompress(s)
             99                 except:
            100                     return False,NETMSG_ERROR_DECOMPRESS
            101             elif compress != message.COMPRESS_NONE:
            102                 return False,NETMSG_ERROR_NOTSUPPORTCOMPRESS
            103             # restore to NetMetaPacket
            104             #MessageBase
            105             m = MessageBase.unmarshall(s)
            106             if m == None:
            107                 return False,NETMSG_ERROR_MESSAGEUNMARSHALL
            108             self.mtxptks.acquire()
            109             self.pktlist.append(m)
            110             self.mtxptks.notify()
            111             self.mtxptks.release()
            112         self.bf = d
            113         return rc
            114     
            115 class NetConnThread:
            116     def __init__(self,conn,proc=None):
            117         self.conn = conn
            118         if not proc:
            119             proc = self.inner
            120         self.thread = threading.Thread(target=proc)
            121         self.thread.start()
            122         
            123     def inner(self):
            124         while True:        
            125             try:
            126                 d = self.conn.recvData()
            127                 if not d:
            128                     self.conn.close()
            129                     break            
            130                 self.conn.eventDataRecv(d)        
            131             except:
            132                 self.conn.close()
            133                 break
            134         self.conn.eventDestroyed()
            135         print 'NetConnThread Exiting'
            136 
            137 class NetConnectionEvent:
            138     EVENT_CONNECTED=1
            139     EVENT_DATA=2
            140     EVENT_DISCONNECTED=3
            141     def __init__(self,type,conn,data=None):
            142         self.type = type
            143         self.conn = conn
            144         self.data = data
            145         
            146 class NetConnection:
            147     def __init__(self,sock=None,svc=None,recvfunc=None):
            148         self.service =svc
            149         self.sock = sock
            150         self.delta = None
            151         self.recvfunc = recvfunc        
            152         self.queue = NetPacketQueue(self)
            153     
            154     def getService(self):
            155         return self.service
            156         
            157     def getQueue(self):
            158         return self.queue
            159         
            160     def peer(self):
            161         pass
            162     
            163     def connect(self,dest):
            164         self.sock = socket.socket()
            165         try:            
            166             self.sock.connect(dest)
            167         except:
            168             return False
            169         return True
            170     '''    
            171     def setDataRecvFunc(self,funcRecv):
            172         recvfunc = funcRecv
            173     '''
            174     def eventDataRecv(self,data):
            175         r = False        
            176         r = self.queue.dataQueueIn(data)
            177         if not r:
            178             self.close() #數(shù)據(jù)解碼錯(cuò)誤,直接關(guān)閉連接
            179             return 
            180         msglist = self.queue.getMessageList()
            181         if len(msglist) == 0:
            182             return 
            183         #直接將數(shù)據(jù)拋給service接收處理 
            184         if self.service:
            185             self.service.eventGotMessage( msglist ,self ) #由到這里將消息直接彈射給用戶,需要獨(dú)立的或多個(gè)線程做支持
            186         #如果是無service的connection對(duì)象接收數(shù)據(jù)需要構(gòu)建一個(gè)thread對(duì)象,且在另外線程調(diào)用 conn.queue.getMessage()獲取消息包
            187         if self.recvfunc:
            188             evt = NetConnectionEvent(NetConnectionEvent.EVENT_DATA,self,msglist)
            189             self.recvfunc(evt)
            190         
            191     def recvData(self,size=1024):
            192         return self.sock.recv(size)
            193         
            194     def sendData(self,d):
            195         pass
            196     
            197     def sendMessage(self,m):
            198         try:
            199             d = NetMetaPacket(msg=m,compress=global_compress_type).marshall()
            200             self.sock.sendall(d)            
            201         except:
            202             self.close()
            203             #traceback.print_exc()
            204             return False
            205         return True
            206             
            207     def close(self):    
            208         try:
            209             self.sock.close()
            210         except:pass
            211     
            212     def eventDestroyed(self):
            213         self.queue.destroy()    
            214         if self.service:
            215             self.service.eventConnDisconnected(self)
            216             
            217         if self.recvfunc:
            218             evt = NetConnectionEvent(NetConnectionEvent.EVENT_DISCONNECTED,self)
            219             self.recvfunc(evt)
            220         
            221 class NetService:
            222     def __init__(self,name,addr):
            223         self.name = name
            224         self.addr = addr
            225         
            226         self.condexit = threading.Condition()
            227         self.sock = None
            228         self.mtxconns = threading.Lock()
            229         self.conns=[]
            230         
            231         
            232     def eventConnCreated(self,conn):
            233         print 'conn created'
            234         self.mtxconns.acquire()
            235         self.conns.append(conn)
            236         self.mtxconns.release()
            237     
            238     def eventConnDisconnected(self,conn):
            239         print 'conn disconnected'
            240         self.mtxconns.acquire()
            241         self.conns.remove(conn)
            242         self.mtxconns.release()
            243     
            244     #service模式下接收的消息從這里冒上來
            245     # conn - 從哪個(gè)連接上接收的數(shù)據(jù)
            246     def eventGotMessage(self,msglist,conn):
            247         pass
            248     
            249     #將連接設(shè)置為select模式
            250     def selectConnIn(self,conn):
            251         pass
            252     
            253     def getConnections(self):
            254         pass
            255     
            256     def start(self):
            257         try:
            258             
            259             self.sock = socket.socket()
            260             print 'lll',self.addr
            261             self.sock.bind(self.addr)            
            262             self.sock.listen(5)
            263             
            264             self.thread = threading.Thread(target=self.service_loop)
            265             self.thread.start()
            266             
            267         except:
            268             traceback.print_exc()
            269             return False
            270         
            271     def shutdown(self):
            272         self.sock.close()
            273         self.mtxconns.acquire()
            274         for c in self.conns:
            275             c.close()
            276         self.mtxconns.release()
            277     
            278     def service_loop(self):
            279         print 'service:(%s) thread starting'%self.name
            280         while True:
            281             fdr = []
            282             fdr.append(self.sock)
            283             infds,wr,e = select.select(fdr,[],[])
            284             if e:
            285                 print 'service thread exit'
            286                 break
            287             for s in infds:
            288                 if s == self.sock: #新連接到達(dá) 
            289                     sock = None
            290                     try:
            291                         sock,peer = self.sock.accept()    #異常產(chǎn)生表示self.sock被強(qiáng)行關(guān)閉                
            292                     except
            293                         print 'service:(%s) thread exiting'%self.name
            294                         return 
            295                     conn = NetConnection(sock,self)
            296                     sock.delta['conn'= conn                    
            297                     self.eventConnCreated(conn)                    
            298         ##
            299         
            300         #self.condexit.nofity()
            301         
            302         
            303 class NetworkServer:
            304     def __init__(self,name=''):
            305         self.name = name
            306         self.services={}
            307         setattr(socket.socket,'delta',{})
            308         setattr(socket.socket,'conn',None)
            309 
            310     def createService(self,name,addr,port,servicecls=NetService):
            311         svc = serviccls(name,addr,port)
            312         self.services[name]=svc
            313         return svc
            314     
            315     def addService(self,serv):
            316         self.services[serv.name]=serv
            317     
            318 
            319         
            320         
            321 def test_packetqueue():
            322     q = NetPacketQueue()
            323     for n in range(3*10):
            324         d = NetMetaPacket(msg=MsgCallReturn(value=[n],bin='A'*(n+1) ),compress=COMPRESS_ZLIB).marshall()
            325         q.dataQueueIn(d)
            326 '''    
            327     while True:
            328         m = q.getMessage()
            329         if not m:
            330             break
            331         print m.attrs,m.bin
            332 '''
            333 
            334 class MyService(NetService):
            335     def __init__(self,name,addr):
            336         NetService.__init__(self,name,addr)
            337     
            338     def eventConnCreated(self,conn):
            339         NetService.eventConnCreated(self,conn)
            340         #print 'client connection created!',conn
            341         thread = NetConnThread(conn)
            342         
            343     def eventGotMessage(self,msglist,conn):
            344         for m in msglist:
            345             print m.attrs,m.bin
            346         
            347 test_dest = ('localhost',12004)
            348 
            349 class MyClient:
            350     def __init__(self):
            351         conn = NetConnection(recvfunc = self.recvEvent)
            352         r = conn.connect( test_dest)
            353         thread = NetConnThread(conn)
            354         for n in range(100):
            355             if not conn.sendMessage(MsgCallReturn(value=[n],bin='A'*(n+1) )):
            356                 print 'serivce lost  abord!!'
            357                 break
            358             time.sleep(1)
            359             
            360     def recvEvent(self,evt):
            361         evt.conn == 1
            362         if evt.type == NetConnectionEvent.EVENT_DATA:
            363             for m in evt.data:
            364                 print m.attrs,m.bin
            365         if evt.type == NetConnectionEvent.EVENT_DISCONNECTED:
            366             print 'connection lost!'
            367             
            368 def test_service():
            369     server = NetworkServer('test-server')
            370     svc =MyService('fileserver',test_dest)
            371     #svc = server.createService("filesync-server",'localhost',12001,MyService)
            372     server.addService(svc)
            373     svc.start()
            374     #time.sleep(5)
            375     #svc.shutdown()
            376 
            377 def test_client():
            378     MyClient()
            379     
            380 if __name__=='__main__':
            381     #test_packetqueue()
            382     p = sys.argv[1]
            383     if p=='client':
            384         test_client()
            385         sys.exit(0)
            386     if p=='service':
            387         test_service()
            388         
            389     time.sleep(100)
            posted on 2012-03-08 23:12 放屁阿狗 閱讀(2117) 評(píng)論(0)  編輯 收藏 引用 所屬分類: OpenSource開源工程perl/python/php/lua/tclWebGisWebService
            香蕉久久久久久狠狠色| 久久国产欧美日韩精品免费| 久久国产成人亚洲精品影院| 久久亚洲AV无码精品色午夜 | 青青草原综合久久大伊人导航| 久久精品国产久精国产一老狼| 国产精品久久久天天影视香蕉| 亚洲精品美女久久久久99| 国产精品伦理久久久久久| 久久亚洲AV成人出白浆无码国产| 精品久久久无码中文字幕天天| 精品久久久无码人妻中文字幕 | 日韩精品久久久久久免费| 久久99精品国产麻豆蜜芽| 久久久久99精品成人片试看| 亚洲人成无码网站久久99热国产| 久久福利青草精品资源站免费| 精品国产99久久久久久麻豆| 久久亚洲精品无码观看不卡| 99久久精品国产毛片| 国产精品9999久久久久| 性欧美大战久久久久久久久 | 久久久久无码精品国产不卡| 亚洲va久久久久| 久久无码AV中文出轨人妻| 国产成人久久精品二区三区| 久久免费精品一区二区| 精品久久久久香蕉网| 欧美一区二区三区久久综| 中文字幕热久久久久久久| 久久亚洲精品成人无码网站| 国产精品久久久久久久久软件| 久久亚洲中文字幕精品一区| 久久天天躁狠狠躁夜夜av浪潮| 精品久久久久久国产免费了| 99精品久久久久久久婷婷| 精品久久人人妻人人做精品| 久久久WWW免费人成精品| 久久午夜综合久久| 亚洲国产天堂久久久久久| 亚洲欧美日韩久久精品|