python zmq實(shí)現(xiàn)日志服務(wù)
ZMQ(zeromq) 是一個消息處理隊列庫 模式有以下三種
1、Request-Reply模式:客戶端在請求后,服務(wù)端必須響應(yīng)
2、Publish-Subscribe模式:廣播所有client,沒有隊列緩存,斷開連接數(shù)據(jù)將永遠(yuǎn)丟失。client可以進(jìn)行數(shù)據(jù)過濾。
3、Parallel Pipeline模式:push進(jìn)行數(shù)據(jù)推送,work進(jìn)行數(shù)據(jù)緩存,pull進(jìn)行數(shù)據(jù)競爭獲取處理。區(qū)別于Publish-Subscribe存在一個數(shù)據(jù)緩存和處理負(fù)載。當(dāng)連接被斷開,數(shù)據(jù)不會丟失,重連后數(shù)據(jù)繼續(xù)發(fā)送到對端。

日志接口及客戶端
1 import time
2 import logging
3 import traceback
4 from io import StringIO
5
6 import zmq
7 from tornado.options import options
8
9
10 class ZMQHandler(logging.Handler):
11 def __init__(self, host, port):
12 super().__init__()
13 self.tcp_url = "tcp://{0}:{1}".format(host, port)
14 context = zmq.Context()
15 self.zq = context.socket(zmq.PUSH)
16 self.zq.connect(self.tcp_url)
17
18 def emit(self, record):
19 try:
20 self.zq.send_pyobj(record, flags=zmq.NOBLOCK, protocol=4)
21 except Exception:
22 fp = StringIO()
23 traceback.print_exc(file=fp)
24 error = fp.getvalue()
25 logger.log(logging.ERROR, error)
26 # logger.log(logging.ERROR, "\n".join(error.split("\n")[:-10]))
27
28
29 class ZMQListener(object):
30 def __init__(self, host, port, *handlers, respect_handler_level=False):
31 self.tcp_url = "tcp://{0}:{1}".format(host, port)
32 self.handlers = handlers
33 self.respect_handler_level = respect_handler_level
34 self.context = None
35 self.zq = None
36 self.connect()
37
38 def connect(self):
39 self.context = zmq.Context()
40 self.zq = self.context.socket(zmq.PULL)
41 self.zq.bind(self.tcp_url)
42
43 def close(self):
44 self.zq.close()
45 self.context.term()
46 self.context = None
47 self.zq = None
48
49 def handle(self, record):
50 """
51 Handle a record.
52
53 This just loops through the handlers offering them the record
54 to handle.
55 """
56 for handler in self.handlers:
57 if not self.respect_handler_level:
58 process = True
59 else:
60 process = record.levelno >= handler.level
61
62 if process:
63 handler.handle(record)
64
65 def run(self):
66 while True:
67 try:
68 record = self.zq.recv_pyobj(flags=zmq.NOBLOCK)
69 if record.msg == "EOF":
70 break
71 self.handle(record)
72 except zmq.ZMQError:
73 time.sleep(1)
74 except Exception as e:
75 print(e)
76
77
78 logger = logging.getLogger()
79 zmq_handler = ZMQHandler("127.0.0.1", 9825)
80 logger.setLevel(logging.DEBUG)
81 logger.addHandler(zmq_handler)
82
83
84 def test_zmq():
85 zq_handler = ZMQHandler("127.0.0.1", 9825)
86
87 rootLogger = logging.getLogger('')
88 rootLogger.setLevel(logging.WARN)
89 # socketHandler = logging.handlers.SocketHandler('localhost',
90 # logging.handlers.DEFAULT_TCP_LOGGING_PORT)
91 # don't bother with a formatter, since a socket handler sends the event as
92 # an unformatted pickle
93 rootLogger.addHandler(zq_handler)
94
95 # Now, we can log to the root logger, or any other logger. First the root
96 logging.info('Jackdaws love my big sphinx of quartz.')
97
98 # Now, define a couple of other loggers which might represent areas in your
99 # application:
100
101 logger1 = logging.getLogger('myapp.area1')
102 logger2 = logging.getLogger('myapp.area2')
103
104 logger1.debug('Quick zephyrs blow, vexing daft Jim.')
105 logger1.info('How quickly daft jumping zebras vex.')
106 logger2.warning('Jail zesty vixen who grabbed pay from quack.')
107 logger2.error('The five boxing wizards jump quickly.')
108
109
110 def test_zmq_listener():
111 handler = logging.StreamHandler()
112 formatter = logging.Formatter('%(name)s: %(message)s')
113 handler.setFormatter(formatter)
114 zq_listener = ZMQListener("127.0.0.1", 8090, handler)
115 zq_listener.run()
116


1 import time
2 import logging
3 import traceback
4 from io import StringIO
5
6 import zmq
7 from tornado.options import options
8
9
10 class ZMQHandler(logging.Handler):
11 def __init__(self, host, port):
12 super().__init__()
13 self.tcp_url = "tcp://{0}:{1}".format(host, port)
14 context = zmq.Context()
15 self.zq = context.socket(zmq.PUSH)
16 self.zq.connect(self.tcp_url)
17
18 def emit(self, record):
19 try:
20 self.zq.send_pyobj(record, flags=zmq.NOBLOCK, protocol=4)
21 except Exception:
22 fp = StringIO()
23 traceback.print_exc(file=fp)
24 error = fp.getvalue()
25 logger.log(logging.ERROR, error)
26 # logger.log(logging.ERROR, "\n".join(error.split("\n")[:-10]))
27
28
29 class ZMQListener(object):
30 def __init__(self, host, port, *handlers, respect_handler_level=False):
31 self.tcp_url = "tcp://{0}:{1}".format(host, port)
32 self.handlers = handlers
33 self.respect_handler_level = respect_handler_level
34 self.context = None
35 self.zq = None
36 self.connect()
37
38 def connect(self):
39 self.context = zmq.Context()
40 self.zq = self.context.socket(zmq.PULL)
41 self.zq.bind(self.tcp_url)
42
43 def close(self):
44 self.zq.close()
45 self.context.term()
46 self.context = None
47 self.zq = None
48
49 def handle(self, record):
50 """
51 Handle a record.
52
53 This just loops through the handlers offering them the record
54 to handle.
55 """
56 for handler in self.handlers:
57 if not self.respect_handler_level:
58 process = True
59 else:
60 process = record.levelno >= handler.level
61
62 if process:
63 handler.handle(record)
64
65 def run(self):
66 while True:
67 try:
68 record = self.zq.recv_pyobj(flags=zmq.NOBLOCK)
69 if record.msg == "EOF":
70 break
71 self.handle(record)
72 except zmq.ZMQError:
73 time.sleep(1)
74 except Exception as e:
75 print(e)
76
77
78 logger = logging.getLogger()
79 zmq_handler = ZMQHandler("127.0.0.1", 9825)
80 logger.setLevel(logging.DEBUG)
81 logger.addHandler(zmq_handler)
82
83
84 def test_zmq():
85 zq_handler = ZMQHandler("127.0.0.1", 9825)
86
87 rootLogger = logging.getLogger('')
88 rootLogger.setLevel(logging.WARN)
89 # socketHandler = logging.handlers.SocketHandler('localhost',
90 # logging.handlers.DEFAULT_TCP_LOGGING_PORT)
91 # don't bother with a formatter, since a socket handler sends the event as
92 # an unformatted pickle
93 rootLogger.addHandler(zq_handler)
94
95 # Now, we can log to the root logger, or any other logger. First the root

96 logging.info('Jackdaws love my big sphinx of quartz.')
97
98 # Now, define a couple of other loggers which might represent areas in your
99 # application:
100
101 logger1 = logging.getLogger('myapp.area1')
102 logger2 = logging.getLogger('myapp.area2')
103
104 logger1.debug('Quick zephyrs blow, vexing daft Jim.')
105 logger1.info('How quickly daft jumping zebras vex.')
106 logger2.warning('Jail zesty vixen who grabbed pay from quack.')
107 logger2.error('The five boxing wizards jump quickly.')
108
109
110 def test_zmq_listener():
111 handler = logging.StreamHandler()
112 formatter = logging.Formatter('%(name)s: %(message)s')
113 handler.setFormatter(formatter)
114 zq_listener = ZMQListener("127.0.0.1", 8090, handler)
115 zq_listener.run()
116
代碼中顯示用的zmq的第三種模式處理日志

服務(wù)端
1 import logging
2 import os
3 import sys
4 from logging.handlers import TimedRotatingFileHandler
5 from tornado.options import options, define
6
7 from settings import log_dir
8
9 define("logger_host", "127.0.0.1", type=int)
10 define("logger_port", 9825, type=int)
11 define("logger_dir", log_dir, type=str)
12 define("redis_host", "127.0.0.1", type=str)
13 define("redis_port", 6379, type=int)
14 define("redis_password", None, type=str)
15 define("redis_db", 12, type=int)
16 options.parse_command_line()
17 stream_handler = logging.StreamHandler()
18
19 formatter = logging.Formatter('%(asctime)s [%(filename)s:%(lineno)s]: %(message)s')
20 stream_handler.setFormatter(formatter)
21
22 file_handler = TimedRotatingFileHandler(
23 os.path.join(options.logger_dir, "{0}".format(options.logger_port)), backupCount=18, when='h',interval=4)
24 file_handler.suffix="%Y-%m-%d_%H-%M-%S.log"
25 file_handler.setLevel(logging.DEBUG)
26 file_handler.formatter = formatter
27 from core.logger import ZMQListener
28 zmq_listener = ZMQListener(options.logger_host, options.logger_port, file_handler)
29 zmq_listener.run()
30
直接用python啟動服務(wù)器就可以,這樣可實(shí)現(xiàn)日志的遠(yuǎn)程寫入


1 import logging
2 import os
3 import sys
4 from logging.handlers import TimedRotatingFileHandler
5 from tornado.options import options, define
6
7 from settings import log_dir
8
9 define("logger_host", "127.0.0.1", type=int)
10 define("logger_port", 9825, type=int)
11 define("logger_dir", log_dir, type=str)
12 define("redis_host", "127.0.0.1", type=str)
13 define("redis_port", 6379, type=int)
14 define("redis_password", None, type=str)
15 define("redis_db", 12, type=int)
16 options.parse_command_line()
17 stream_handler = logging.StreamHandler()
18
19 formatter = logging.Formatter('%(asctime)s [%(filename)s:%(lineno)s]: %(message)s')
20 stream_handler.setFormatter(formatter)
21
22 file_handler = TimedRotatingFileHandler(
23 os.path.join(options.logger_dir, "{0}".format(options.logger_port)), backupCount=18, when='h',interval=4)
24 file_handler.suffix="%Y-%m-%d_%H-%M-%S.log"
25 file_handler.setLevel(logging.DEBUG)
26 file_handler.formatter = formatter
27 from core.logger import ZMQListener
28 zmq_listener = ZMQListener(options.logger_host, options.logger_port, file_handler)
29 zmq_listener.run()
30
posted on 2019-09-21 17:01 Benjamin 閱讀(901) 評論(0) 編輯 收藏 引用 所屬分類: python