|
ZMQ(zeromq) 是一個(gè)消息處理隊(duì)列庫 模式有以下三種 1、Request-Reply模式:客戶端在請(qǐng)求后,服務(wù)端必須響應(yīng) 2、Publish-Subscribe模式:廣播所有client,沒有隊(duì)列緩存,斷開連接數(shù)據(jù)將永遠(yuǎn)丟失。client可以進(jìn)行數(shù)據(jù)過濾。 3、Parallel Pipeline模式:push進(jìn)行數(shù)據(jù)推送,work進(jìn)行數(shù)據(jù)緩存,pull進(jìn)行數(shù)據(jù)競(jìng)爭(zhēng)獲取處理。區(qū)別于Publish-Subscribe存在一個(gè)數(shù)據(jù)緩存和處理負(fù)載。當(dāng)連接被斷開,數(shù)據(jù)不會(huì)丟失,重連后數(shù)據(jù)繼續(xù)發(fā)送到對(duì)端。  日志接口及客戶端
1 import time 2 import logging 3 import traceback 4 from io import StringIO 5 6 import zmq 7 from tornado.options import options 8 9 10 class ZMQHandler(logging.Handler): 11 def __init__(self, host, port): 12 super().__init__() 13 self.tcp_url = "tcp://{0}:{1}".format(host, port) 14 context = zmq.Context() 15 self.zq = context.socket(zmq.PUSH) 16 self.zq.connect(self.tcp_url) 17 18 def emit(self, record): 19 try: 20 self.zq.send_pyobj(record, flags=zmq.NOBLOCK, protocol=4) 21 except Exception: 22 fp = StringIO() 23 traceback.print_exc(file=fp) 24 error = fp.getvalue() 25 logger.log(logging.ERROR, error) 26 # logger.log(logging.ERROR, "\n".join(error.split("\n")[:-10])) 27 28 29 class ZMQListener(object): 30 def __init__(self, host, port, *handlers, respect_handler_level=False): 31 self.tcp_url = "tcp://{0}:{1}".format(host, port) 32 self.handlers = handlers 33 self.respect_handler_level = respect_handler_level 34 self.context = None 35 self.zq = None 36 self.connect() 37 38 def connect(self): 39 self.context = zmq.Context() 40 self.zq = self.context.socket(zmq.PULL) 41 self.zq.bind(self.tcp_url) 42 43 def close(self): 44 self.zq.close() 45 self.context.term() 46 self.context = None 47 self.zq = None 48 49 def handle(self, record): 50 """ 51 Handle a record. 52 53 This just loops through the handlers offering them the record 54 to handle. 55 """ 56 for handler in self.handlers: 57 if not self.respect_handler_level: 58 process = True 59 else: 60 process = record.levelno >= handler.level 61 62 if process: 63 handler.handle(record) 64 65 def run(self): 66 while True: 67 try: 68 record = self.zq.recv_pyobj(flags=zmq.NOBLOCK) 69 if record.msg == "EOF": 70 break 71 self.handle(record) 72 except zmq.ZMQError: 73 time.sleep(1) 74 except Exception as e: 75 print(e) 76 77 78 logger = logging.getLogger() 79 zmq_handler = ZMQHandler("127.0.0.1", 9825) 80 logger.setLevel(logging.DEBUG) 81 logger.addHandler(zmq_handler) 82 83 84 def test_zmq(): 85 zq_handler = ZMQHandler("127.0.0.1", 9825) 86 87 rootLogger = logging.getLogger('') 88 rootLogger.setLevel(logging.WARN) 89 # socketHandler = logging.handlers.SocketHandler('localhost', 90 # logging.handlers.DEFAULT_TCP_LOGGING_PORT) 91 # don't bother with a formatter, since a socket handler sends the event as 92 # an unformatted pickle 93 rootLogger.addHandler(zq_handler) 94 95 # Now, we can log to the root logger, or any other logger. First the root 96 logging.info('Jackdaws love my big sphinx of quartz.') 97 98 # Now, define a couple of other loggers which might represent areas in your 99 # application: 100 101 logger1 = logging.getLogger('myapp.area1') 102 logger2 = logging.getLogger('myapp.area2') 103 104 logger1.debug('Quick zephyrs blow, vexing daft Jim.') 105 logger1.info('How quickly daft jumping zebras vex.') 106 logger2.warning('Jail zesty vixen who grabbed pay from quack.') 107 logger2.error('The five boxing wizards jump quickly.') 108 109 110 def test_zmq_listener(): 111 handler = logging.StreamHandler() 112 formatter = logging.Formatter('%(name)s: %(message)s') 113 handler.setFormatter(formatter) 114 zq_listener = ZMQListener("127.0.0.1", 8090, handler) 115 zq_listener.run() 116 代碼中顯示用的zmq的第三種模式處理日志  服務(wù)端
1 import logging 2 import os 3 import sys 4 from logging.handlers import TimedRotatingFileHandler 5 from tornado.options import options, define 6 7 from settings import log_dir 8 9 define("logger_host", "127.0.0.1", type=int) 10 define("logger_port", 9825, type=int) 11 define("logger_dir", log_dir, type=str) 12 define("redis_host", "127.0.0.1", type=str) 13 define("redis_port", 6379, type=int) 14 define("redis_password", None, type=str) 15 define("redis_db", 12, type=int) 16 options.parse_command_line() 17 stream_handler = logging.StreamHandler() 18 19 formatter = logging.Formatter('%(asctime)s [%(filename)s:%(lineno)s]: %(message)s') 20 stream_handler.setFormatter(formatter) 21 22 file_handler = TimedRotatingFileHandler( 23 os.path.join(options.logger_dir, "{0}".format(options.logger_port)), backupCount=18, when='h',interval=4) 24 file_handler.suffix="%Y-%m-%d_%H-%M-%S.log" 25 file_handler.setLevel(logging.DEBUG) 26 file_handler.formatter = formatter 27 from core.logger import ZMQListener 28 zmq_listener = ZMQListener(options.logger_host, options.logger_port, file_handler) 29 zmq_listener.run() 30 直接用python啟動(dòng)服務(wù)器就可以,這樣可實(shí)現(xiàn)日志的遠(yuǎn)程寫入
|