背景
线上的系统在运行中,发生故障时怎么及时的通过手机通知到相关人员?当然这是个很简单的需求,现有的方法有很多,例如:
欢迎大家加入小编创建的Python行业交流群,有大牛答疑,有资源共享,有企业招人!是一个非常不错的交流基地!群号:683380553
- 如果我们用的云产品,那么一般都会有配套对应的监控预警功能,根据需要配置一下即可,支持短信,邮箱通知。
- 如果我们已经搭建了一套运维监控系统,比如zabbix之类的,那么我们学会zabbix,然后配置也即可,支持短信,邮箱通知。
- …
但如果我们希望有一个比较简单轻便,能灵活定制和快速实施的方法,又能同时支持微信和钉钉通知呢?以下就介绍这样一个基于python的简单方法,暂且起个名字叫robotprobe。
上路
在开始之前,先定义两种对象,robotprobe将由这两种对象组成。
Probe:探针,用于检查检测某项功能,某个指标是否正常,并包含预警相关规则配置。
Robot:机器人,发生异常情况时,要发送通知的对象,这里特指微信机器人和钉钉机器人。
Probe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Probe:
def __init__(self, name, interval=10, threshold=3):
self.name = name
#检测间隔,单位秒
self.interval = interval
#当前是否有预警
self.warning = False
#预期消息
self.warning_msg = None
#预警计数器
self.warning_counter = 0
#阈值,预警次数达到阈值时发消息通知
self.threshold = threshold
#预警消息模板
self.warning_msg_tpl = self.name + "预警:%s"
def test(self):
print(self.name + " testing")
self.do_test()
self.warning = self.warning_counter >= self.threshold
if self.warning:
print(self.name + " has warning:" + self.warning_msg)
def do_test(self):
raise NotImplementedError
def consume_warning(self):
warning_msg = self.warning_msg
self.warning = False
self.warning_msg = None
self.warning_counter = 0
return warning_msg
UrlProbe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class UrlProbe(Probe):
def __init__(self, name, interval, threshold):
super().__init__(name, interval, threshold)
adapter = SSLAdapter('TLSv1')
self.session = requests.Session()
self.session.mount('https://', adapter)
def do_test(self):
try:
self.session.get(self.url, timeout=10)
except BaseException as e:
self.warning_counter = self.warning_counter + 1
self.warning_msg = self.warning_msg_tpl % str(e)
print(self.warning_msg)
SqlProbe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class SqlProbe(Probe):
def __init__(self, name, interval, threshold):
super().__init__(name, interval, threshold)
db = pymysql.connect(host='localhost', user='root',
passwd='xxxx', db='demo', port=3306, charset='utf8',
autocommit=True)
self.cursor = db.cursor()
def do_test(self):
try:
self.cursor.execute(self.sql)
result = self.cursor.fetchone()
#当sql查询的指标大于业务阈值时,发生预警
if result[0] >= self.biz_threshold:
self.warning_counter = self.warning_counter + 1
#biz_tpl可定制业务预警消息模板
self.warning_msg = self.warning_msg_tpl % (self.biz_tpl % (result[0], self.biz_threshold))
print(self.warning_msg)
except BaseException as e:
self.warning_counter = self.warning_counter + 1
self.warning_msg = self.warning_msg_tpl % str(e)
print(self.warning_msg)
其他
其他类型的Probe则可以根据实际需要自由扩展实现。
Robot
1
2
3
4
5
6
class Robot:
def __init__(self):
pass
def send_text(self, msg):
raise NotImplementedError
WechatRobot
1
2
3
4
5
6
7
8
9
10
11
12
13
class WechatRobot(Robot):
def __init__(self, chat_names):
bot = Bot(console_qr=True, cache_path=True)
self.chats = bot.search(chat_names)
super().__init__()
def send_text(self, msg):
for chat in self.chats:
chat.send_msg(msg)
@staticmethod
def run_embed():
ThreadPoolExecutor(1).submit(embed)
DingTalkRobot
1
2
3
4
5
6
7
class DingTalkRobot(Robot):
def __init__(self, webhook):
self.bot = DingtalkChatbot(webhook)
super().__init__()
def send_text(self, msg):
self.bot.send_text(msg, is_at_all=True)
微信VS钉钉
微信机器人基于wxpy实现,wxpy功能很丰富,基本微信上收发消息相关的功能都可以用它来实现,这篇文章有个挺好的使用示例,我们这里只用到发送消息的功能,微信虽然功能,但有个缺点就是用wxpy发消息(其实是网页版微信),账号有可能被封,另外新注册的微信号也是不能用的。
钉钉机器人基于DingtalkChatbot实现,优点就是官方默认提供机器人的功能,虽然发消息有限制,但一可以控制发消息的频率,二也可加多个机器人去分流,缺点就是只能发消息,不收消息,进而根据收到的消息做定制回复。
Demo示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import time
import schedule
from probe import UrlProbe, SqlProbe
from robot import DingTalkRobot, WechatRobot
webhook = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxx'
bot = DingTalkRobot(webhook)
# bot = WechatRobot([u"研发部", u"运维部", u"someone you hate"])
# bot.run_embed()
# 每20秒检测百度网站是否正常,如果不正常超3次,则预警
baidu_probe = UrlProbe("百度", 20, 3)
baidu_probe.url = "https://www.baidux.com"
# 每30秒检测在线用户数是否太高,如果不正常达到1次,则预警
online_user_probe = SqlProbe("在线用户数", 30, 1)
online_user_probe.sql = "select count(1) from online_user"
online_user_probe.biz_threshold = 50000
online_user_probe.biz_tpl = "当前在线用户数%s,达到阈值%s"
probes = [baidu_probe, online_user_probe]
def notify_latest_state():
for p in probes:
if p.warning:
bot.send_text(p.name + "异常:" + p.warning_msg, is_at_all=True)
else:
bot.send_text(p.name + "正常", is_at_all=True)
def send_warning(msg):
bot.send_text(msg, is_at_all=True)
def trigger_probe(p):
p.test()
if p.warning:
send_warning(p.consume_warning())
schedule.every().day.at("09:00").do(notify_latest_state)
for probe in probes:
schedule.every(probe.interval).seconds.do(trigger_probe, probe)
if __name__ == '__main__':
while True:
schedule.run_pending()
time.sleep(1)
print("I am alive!")
依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
certifi==2019.3.9
chardet==3.0.4
DingtalkChatbot==1.3.0
future==0.17.1
idna==2.8
itchat==1.2.32
PyMySQL==0.9.3
pypng==0.0.19
PyQRCode==1.2.1
requests==2.21.0
requests-toolbelt==0.9.1
schedule==0.6.0
urllib3==1.24.1
wxpy==0.3.9.8