用python写一个预警机器人(支持微信和钉钉)

背景

线上的系统在运行中,发生故障时怎么及时的通过手机通知到相关人员?当然这是个很简单的需求,现有的方法有很多,例如:

欢迎大家加入小编创建的Python行业交流群,有大牛答疑,有资源共享,有企业招人!是一个非常不错的交流基地!群号:683380553

  1. 如果我们用的云产品,那么一般都会有配套对应的监控预警功能,根据需要配置一下即可,支持短信,邮箱通知。
  2. 如果我们已经搭建了一套运维监控系统,比如zabbix之类的,那么我们学会zabbix,然后配置也即可,支持短信,邮箱通知。

但如果我们希望有一个比较简单轻便,能灵活定制和快速实施的方法,又能同时支持微信和钉钉通知呢?以下就介绍这样一个基于python的简单方法,暂且起个名字叫robotprobe。

上路

在开始之前,先定义两种对象,robotprobe将由这两种对象组成。

Probe:探针,用于检查检测某项功能,某个指标是否正常,并包含预警相关规则配置。

Robot:机器人,发生异常情况时,要发送通知的对象,这里特指微信机器人和钉钉机器人。

Probe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

class Probe:
    def __init__(self, name, interval=10, threshold=3):
        self.name = name
        #检测间隔,单位秒
        self.interval = interval
        #当前是否有预警
        self.warning = False
        #预期消息
        self.warning_msg = None
        #预警计数器
        self.warning_counter = 0
        #阈值,预警次数达到阈值时发消息通知
        self.threshold = threshold
        #预警消息模板
        self.warning_msg_tpl = self.name + "预警:%s"

    def test(self):
        print(self.name + " testing")
        self.do_test()
        self.warning = self.warning_counter >= self.threshold
        if self.warning:
            print(self.name + " has warning:" + self.warning_msg)

    def do_test(self):
        raise NotImplementedError

    def consume_warning(self):
        warning_msg = self.warning_msg
        self.warning = False
        self.warning_msg = None
        self.warning_counter = 0
        return warning_msg

UrlProbe

1
2
3
4
5
6
7
8
9
10
11
12
13
14

class UrlProbe(Probe):
    def __init__(self, name, interval, threshold):
        super().__init__(name, interval, threshold)
        adapter = SSLAdapter('TLSv1')
        self.session = requests.Session()
        self.session.mount('https://', adapter)

    def do_test(self):
        try:
            self.session.get(self.url, timeout=10)
        except BaseException as e:
            self.warning_counter = self.warning_counter + 1
            self.warning_msg = self.warning_msg_tpl % str(e)
            print(self.warning_msg)

SqlProbe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

class SqlProbe(Probe):
    def __init__(self, name, interval, threshold):
        super().__init__(name, interval, threshold)
        db = pymysql.connect(host='localhost', user='root',
                             passwd='xxxx', db='demo', port=3306, charset='utf8',
                             autocommit=True)
        self.cursor = db.cursor()

    def do_test(self):
        try:
            self.cursor.execute(self.sql)
            result = self.cursor.fetchone()
            #当sql查询的指标大于业务阈值时,发生预警
            if result[0] >= self.biz_threshold:
                self.warning_counter = self.warning_counter + 1
                #biz_tpl可定制业务预警消息模板
                self.warning_msg = self.warning_msg_tpl % (self.biz_tpl % (result[0], self.biz_threshold))
                print(self.warning_msg)
        except BaseException as e:
            self.warning_counter = self.warning_counter + 1
            self.warning_msg = self.warning_msg_tpl % str(e)
            print(self.warning_msg)

其他

其他类型的Probe则可以根据实际需要自由扩展实现。

Robot

1
2
3
4
5
6

class Robot:
    def __init__(self):
        pass

    def send_text(self, msg):
        raise NotImplementedError

WechatRobot

1
2
3
4
5
6
7
8
9
10
11
12
13

class WechatRobot(Robot):
    def __init__(self, chat_names):
        bot = Bot(console_qr=True, cache_path=True)
        self.chats = bot.search(chat_names)
        super().__init__()

    def send_text(self, msg):
        for chat in self.chats:
            chat.send_msg(msg)

    @staticmethod
    def run_embed():
        ThreadPoolExecutor(1).submit(embed)

DingTalkRobot

1
2
3
4
5
6
7

class DingTalkRobot(Robot):
    def __init__(self, webhook):
        self.bot = DingtalkChatbot(webhook)
        super().__init__()

    def send_text(self, msg):
        self.bot.send_text(msg, is_at_all=True)

微信VS钉钉

微信机器人基于wxpy实现,wxpy功能很丰富,基本微信上收发消息相关的功能都可以用它来实现,这篇文章有个挺好的使用示例,我们这里只用到发送消息的功能,微信虽然功能,但有个缺点就是用wxpy发消息(其实是网页版微信),账号有可能被封,另外新注册的微信号也是不能用的。

钉钉机器人基于DingtalkChatbot实现,优点就是官方默认提供机器人的功能,虽然发消息有限制,但一可以控制发消息的频率,二也可加多个机器人去分流,缺点就是只能发消息,不收消息,进而根据收到的消息做定制回复。

Demo示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

import time

import schedule

from probe import UrlProbe, SqlProbe
from robot import DingTalkRobot, WechatRobot

webhook = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxx'
bot = DingTalkRobot(webhook)

# bot = WechatRobot([u"研发部", u"运维部", u"someone you hate"])
# bot.run_embed()

# 每20秒检测百度网站是否正常,如果不正常超3次,则预警
baidu_probe = UrlProbe("百度", 20, 3)
baidu_probe.url = "https://www.baidux.com"

# 每30秒检测在线用户数是否太高,如果不正常达到1次,则预警
online_user_probe = SqlProbe("在线用户数", 30, 1)
online_user_probe.sql = "select count(1) from online_user"
online_user_probe.biz_threshold = 50000
online_user_probe.biz_tpl = "当前在线用户数%s,达到阈值%s"

probes = [baidu_probe, online_user_probe]


def notify_latest_state():
    for p in probes:
        if p.warning:
            bot.send_text(p.name + "异常:" + p.warning_msg, is_at_all=True)
        else:
            bot.send_text(p.name + "正常", is_at_all=True)


def send_warning(msg):
    bot.send_text(msg, is_at_all=True)


def trigger_probe(p):
    p.test()
    if p.warning:
        send_warning(p.consume_warning())


schedule.every().day.at("09:00").do(notify_latest_state)

for probe in probes:
    schedule.every(probe.interval).seconds.do(trigger_probe, probe)

if __name__ == '__main__':
    while True:
        schedule.run_pending()
        time.sleep(1)
        print("I am alive!")

依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14

certifi==2019.3.9
chardet==3.0.4
DingtalkChatbot==1.3.0
future==0.17.1
idna==2.8
itchat==1.2.32
PyMySQL==0.9.3
pypng==0.0.19
PyQRCode==1.2.1
requests==2.21.0
requests-toolbelt==0.9.1
schedule==0.6.0
urllib3==1.24.1
wxpy==0.3.9.8

    原文作者:萌新程序员
    原文地址: https://zhuanlan.zhihu.com/p/59278794
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞