|
ZMQ(zeromq) 是一個消息處理隊列庫 模式有以下三種 1、Request-Reply模式:客戶端在請求后,服務端必須響應 2、Publish-Subscribe模式:廣播所有client,沒有隊列緩存,斷開連接數據將永遠丟失。client可以進行數據過濾。 3、Parallel Pipeline模式:push進行數據推送,work進行數據緩存,pull進行數據競爭獲取處理。區別于Publish-Subscribe存在一個數據緩存和處理負載。當連接被斷開,數據不會丟失,重連后數據繼續發送到對端。  日志接口及客戶端
1 import time 2 import logging 3 import traceback 4 from io import StringIO 5 6 import zmq 7 from tornado.options import options 8 9 10 class ZMQHandler(logging.Handler): 11 def __init__(self, host, port): 12 super().__init__() 13 self.tcp_url = "tcp://{0}:{1}".format(host, port) 14 context = zmq.Context() 15 self.zq = context.socket(zmq.PUSH) 16 self.zq.connect(self.tcp_url) 17 18 def emit(self, record): 19 try: 20 self.zq.send_pyobj(record, flags=zmq.NOBLOCK, protocol=4) 21 except Exception: 22 fp = StringIO() 23 traceback.print_exc(file=fp) 24 error = fp.getvalue() 25 logger.log(logging.ERROR, error) 26 # logger.log(logging.ERROR, "\n".join(error.split("\n")[:-10])) 27 28 29 class ZMQListener(object): 30 def __init__(self, host, port, *handlers, respect_handler_level=False): 31 self.tcp_url = "tcp://{0}:{1}".format(host, port) 32 self.handlers = handlers 33 self.respect_handler_level = respect_handler_level 34 self.context = None 35 self.zq = None 36 self.connect() 37 38 def connect(self): 39 self.context = zmq.Context() 40 self.zq = self.context.socket(zmq.PULL) 41 self.zq.bind(self.tcp_url) 42 43 def close(self): 44 self.zq.close() 45 self.context.term() 46 self.context = None 47 self.zq = None 48 49 def handle(self, record): 50 """ 51 Handle a record. 52 53 This just loops through the handlers offering them the record 54 to handle. 55 """ 56 for handler in self.handlers: 57 if not self.respect_handler_level: 58 process = True 59 else: 60 process = record.levelno >= handler.level 61 62 if process: 63 handler.handle(record) 64 65 def run(self): 66 while True: 67 try: 68 record = self.zq.recv_pyobj(flags=zmq.NOBLOCK) 69 if record.msg == "EOF": 70 break 71 self.handle(record) 72 except zmq.ZMQError: 73 time.sleep(1) 74 except Exception as e: 75 print(e) 76 77 78 logger = logging.getLogger() 79 zmq_handler = ZMQHandler("127.0.0.1", 9825) 80 logger.setLevel(logging.DEBUG) 81 logger.addHandler(zmq_handler) 82 83 84 def test_zmq(): 85 zq_handler = ZMQHandler("127.0.0.1", 9825) 86 87 rootLogger = logging.getLogger('') 88 rootLogger.setLevel(logging.WARN) 89 # socketHandler = logging.handlers.SocketHandler('localhost', 90 # logging.handlers.DEFAULT_TCP_LOGGING_PORT) 91 # don't bother with a formatter, since a socket handler sends the event as 92 # an unformatted pickle 93 rootLogger.addHandler(zq_handler) 94 95 # Now, we can log to the root logger, or any other logger. First the root 96 logging.info('Jackdaws love my big sphinx of quartz.') 97 98 # Now, define a couple of other loggers which might represent areas in your 99 # application: 100 101 logger1 = logging.getLogger('myapp.area1') 102 logger2 = logging.getLogger('myapp.area2') 103 104 logger1.debug('Quick zephyrs blow, vexing daft Jim.') 105 logger1.info('How quickly daft jumping zebras vex.') 106 logger2.warning('Jail zesty vixen who grabbed pay from quack.') 107 logger2.error('The five boxing wizards jump quickly.') 108 109 110 def test_zmq_listener(): 111 handler = logging.StreamHandler() 112 formatter = logging.Formatter('%(name)s: %(message)s') 113 handler.setFormatter(formatter) 114 zq_listener = ZMQListener("127.0.0.1", 8090, handler) 115 zq_listener.run() 116 代碼中顯示用的zmq的第三種模式處理日志  服務端
1 import logging 2 import os 3 import sys 4 from logging.handlers import TimedRotatingFileHandler 5 from tornado.options import options, define 6 7 from settings import log_dir 8 9 define("logger_host", "127.0.0.1", type=int) 10 define("logger_port", 9825, type=int) 11 define("logger_dir", log_dir, type=str) 12 define("redis_host", "127.0.0.1", type=str) 13 define("redis_port", 6379, type=int) 14 define("redis_password", None, type=str) 15 define("redis_db", 12, type=int) 16 options.parse_command_line() 17 stream_handler = logging.StreamHandler() 18 19 formatter = logging.Formatter('%(asctime)s [%(filename)s:%(lineno)s]: %(message)s') 20 stream_handler.setFormatter(formatter) 21 22 file_handler = TimedRotatingFileHandler( 23 os.path.join(options.logger_dir, "{0}".format(options.logger_port)), backupCount=18, when='h',interval=4) 24 file_handler.suffix="%Y-%m-%d_%H-%M-%S.log" 25 file_handler.setLevel(logging.DEBUG) 26 file_handler.formatter = formatter 27 from core.logger import ZMQListener 28 zmq_listener = ZMQListener(options.logger_host, options.logger_port, file_handler) 29 zmq_listener.run() 30 直接用python啟動服務器就可以,這樣可實現日志的遠程寫入
|