dvr項目中flex駐留在瀏覽器,而影響播放程序是獨立的進程,sandbox的安全問題導致flex的代碼無法直接與播放進程IPC通信,那只有通過公網IP的主機進行橋接。 很多年以前用過foundstone系列的工具,也有socket轉向的功能,包括在5173時做過lsp的底層轉向軟件(仿sockcap),原理當然是相當簡單,python是首選工具。 代碼接收兩端建立socket進來,并根據相同的id號來進行socket配對,之后兩個socket之間就實現互相轉發(技術同之前寫的http代理服務器 ) 1 # -- coding:utf-8 -- 2 3 import socket,traceback,os,os.path,sys,time,struct,base64,gzip,array,threading 4 import select,json 5 6 7 ''' 8 {'id','type'} 9 10 type - 'mapshow','imageplay' 11 id - 一次會話的編號 12 13 imageplay 與xbridge建立socket連接,并注冊一個會話編號(隨機產生) 14 imageplay啟動mapshow,并將會話編號傳遞給mapshow,mapshow建立xbridge的連接,并提交會話編號 15 xbridge將雙向傳遞相同會話編號的數據到對方 16 17 sock1的客戶必須等sock2連接進入之后發送數據,否則將sock1數據轉發給sock2時將產生異常 18 ''' 19 20 class ConnectionPair: 21 def __init__(self,app): 22 self.app = app 23 self.id = '' 24 self.sock1=None #imageplay上來的連接 25 self.sock2=None #第二個連接上來的對象mapdemo 26 27 def start(self): 28 t = threading.Thread(target=self.threadRecv) 29 t.start() 30 31 def onLostConnection(self): 32 try: 33 print 'connection pair lost..' 34 self.sock1.close() 35 self.sock2.close() 36 self.app.onConnectionPairBroken(self) 37 except: 38 traceback.print_exc() 39 40 def threadRecv(self): 41 print 'service threading entering ' 42 import select 43 while True: 44 fds = [] 45 if self.sock1: 46 fds.append(self.sock1) 47 if self.sock2: 48 fds.append(self.sock2) 49 #fds = [self.sock1,self.sock2] 50 try: 51 #sock2未連接進來前,將不接收sock1上產生數據 52 #print 'fds:',len(fds),fds 53 rds,wds,eds = select.select(fds,[],[],1) 54 if not rds:#timeout 55 continue 56 57 for s in rds: 58 d = s.recv(1024) 59 #print d 60 if not d: 61 raise 'any jump' 62 63 to = self.sock2 64 if s == self.sock2: 65 to = self.sock1 66 #print 'redirect data:',d 67 to.sendall(d) 68 except: 69 traceback.print_exc() 70 self.onLostConnection() 71 break 72 73 print 'ConnThread Exiting ' 74 75 76 77 78 class XBridge: 79 def __init__(self,addr=('',12788)): 80 self.sock = None 81 self.addr = addr 82 self.conns={} #{id} 83 self.mtxconns = threading.Lock() 84 85 def onConnectionPairBroken(self,cp): 86 self.mtxconns.acquire() 87 del self.conns[cp.id] 88 print 'onConnectionPairBroken(),removed:',cp.id 89 self.mtxconns.release() 90 91 def start(self): 92 try: 93 94 self.sock = socket.socket() 95 #print 'lll',self.addr 96 self.sock.bind( tuple(self.addr) ) 97 self.sock.listen(5) 98 99 self.thread = threading.Thread(target=self.service_loop) 100 self.thread.start() 101 print 'xbridge started!' 102 self.thread.join() 103 except: 104 traceback.print_exc() 105 return False 106 107 def shutdown(self): 108 self.sock.close() 109 110 111 def service_loop(self): 112 113 while True: 114 fdr = [] 115 fdr.append(self.sock) 116 infds,wr,e = select.select(fdr,[],[]) 117 if e: 118 print 'service thread exit ' 119 break 120 for s in infds: 121 if s == self.sock: #新連接到達 122 sock = None 123 try: 124 sock,peer = self.sock.accept() #異常產生表示self.sock被強行關閉 125 print 'new client incoming ',peer 126 except: 127 return 128 thread = threading.Thread(target=self.threadNewClient,args=(sock,)) 129 thread.start() 130 131 def threadNewClient(self,sock): 132 #等待注冊信息進入 ,5 秒超時 133 try: 134 fdr = [sock,] 135 print 'enter select ' 136 infds,wr,e = select.select(fdr,[],[],5) 137 138 if not infds: 139 sock.close() 140 print 'client register timeout' 141 return #接收超時 142 d = sock.recv(1024) 143 d = json.loads(d) 144 id,type = d['id'],d['type'] 145 connpair = None 146 print id,type 147 self.mtxconns.acquire() 148 if type == 'imageplay': 149 150 cp = ConnectionPair(self) 151 cp.id = id 152 cp.sock1 = sock 153 self.conns[id] = cp 154 cp.start() 155 156 elif type =='mapshow': 157 connpair = self.conns.get(id,None) 158 if connpair == None: #沒找到imageplay 159 sock.close() 160 print 'mapshow cannt found imageplay..' 161 else: 162 print 'mapclient matched!' 163 connpair.sock2 = sock 164 else: 165 print 'unknown command id:',id,type 166 167 self.mtxconns.release() 168 except: 169 sock.close() 170 traceback.print_exc() 171 172 if __name__=='__main__': 173 XBridge().start() #default '',12788 174
|