基于ryu实现网络的流量监控--monitor

程序均为ryubook的案例,在此记录一下我对程序的理解和实验的执行步骤。如系统学习,可以下载ryubook进行学习,此外可以参考https://ryu.readthedocs.io/en/latest/getting_started.html,这是官方的网站,里面详细阐述了各种功能和开发步骤。

背景:

流量监控,即针对交换机加入流量监控的功能,包括监控错包,发送包的数量等一系列的操作。因为,网络已经成为许多服务或业务的基础建设,所以维护一个稳定的网络环境是必要的。但是网络问题总是不断地发生。网络发生异常的时候,必须快速的找到原因,并且尽速恢复原状。因此,为了网络的安全以及业务的正常运作,持续注意网络的健康状况是最基本的工作。

原理:

对于流量监控,首先,需要实现二层交换机的基本功能。然后,需要对交换机的状态进行监控,即交换机的上线和下线情况;需要对交换机进行数据请求,包括请求端口信息和流表信息,由于流量监控是持续性的对网络设备进行监控,所有需要开启一个线程持续对交换机发送请求信息;向交换机发送请求信息后,交换机就会回送请求报文,也就是会向控制器发送端口和流表的信息,此时就需要对数据进行解析并显示出来。这样就实现了流量监控的基本功能,工作流程如下图。

《基于ryu实现网络的流量监控--monitor》

代码编写:

 首先,编写类SimpleMonitor(),进行初始化,定义一个datapaths字典。

class SimpleMonitor( simple_switch_13 . SimpleSwitch13 ):
    def __init__(self, *args, **kwargs):
        super(SimpleMonitor , self).__init__(*args, **kwargs)
        self.datapaths = {}

然后编写监听交换机状态的方法,判断交换机是否在线,然后再将交换机写入字典当中。

@set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER, DEAD_DISPATCHER])  
#对事件进行监听,MAIN_DISPATCHER, DEAD_DISPATCHER主要用于判断交换机状态
    def _state_change_handler(self, ev):
        datapath = ev.datapath
        if ev.state == MAIN_DISPATCHER: 
            if datapath.id not in self.datapaths:
                self.logger.debug('register datapath: %016x', datapath.id)
                self.datapaths[datapath.id] = datapath
        elif ev.state == DEAD_DISPATCHER:
            if datapath.id in self.datapaths:
                self.logger.debug('unregister datapath: %016x', datapath.id)
                del self.datapaths[datapath.id]    

 

然后编写请求方法,实现控制器对交换机进行请求,包括请求流表和端口的信息。对于这个方法的编写,先对信息进行解析,即对版本信息和parser进行解析,然后调用ofproto_v1_3_parser里面的OFPFlowStatsRequest()和OFPPortStatsRequest()对流表和端口进行下发请求操作。

    def _request_stats(self, datapath):
        self.logger.debug('send stats request: %016x', datapath.id)
        ofproto = datapath.ofproto
        parser = datapath.ofproto_parser

        req = parser.OFPFlowStatsRequest(datapath)
        datapath.send_msg(req)

        req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
        datapath.send_msg(req)

再编写方法,周期性的向交换机发送信息。

    def _monitor(self):
        while True:
            for dp in self.datapaths.values():
                self._request_stats(dp)
            hub.sleep(10)  #每隔10秒发送一次请求数据

最后,是对收到的端口信息和流表信息进行解析并显示在终端设备中,解析数据的数据格式在ofproto_v1_3_parser中写的有(因为用的是1.3版本的)。例如下面这一段就是ofproto_v1_3_parser中给出的例子。

Example::

        @set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
        def port_stats_reply_handler(self, ev):
            ports = []
            for stat in ev.msg.body:
                ports.append('port_no=%d '
                             'rx_packets=%d tx_packets=%d '
                             'rx_bytes=%d tx_bytes=%d '
                             'rx_dropped=%d tx_dropped=%d '
                             'rx_errors=%d tx_errors=%d '
                             'rx_frame_err=%d rx_over_err=%d rx_crc_err=%d '
                             'collisions=%d duration_sec=%d duration_nsec=%d' %
                             (stat.port_no,
                              stat.rx_packets, stat.tx_packets,
                              stat.rx_bytes, stat.tx_bytes,
                              stat.rx_dropped, stat.tx_dropped,
                              stat.rx_errors, stat.tx_errors,
                              stat.rx_frame_err, stat.rx_over_err,
                              stat.rx_crc_err, stat.collisions,
                              stat.duration_sec, stat.duration_nsec))
            self.logger.debug('PortStats: %s', ports)
    """

代码如下

@set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
    def _flow_stats_reply_handler(self, ev):
        body = ev.msg.body

        self.logger.info('datapath         '
                         'in-port  eth-dst           '
                         'out-port packets  bytes')
        self.logger.info('---------------- '
                         '-------- ----------------- '
                         '-------- -------- --------')
        for stat in sorted([flow for flow in body if flow.priority == 1],
                           key=lambda flow: (flow.match['in_port'],
                                             flow.match['eth_dst'])):
            self.logger.info('%016x %8x %17s %8x %8d %8d',
                             ev.msg.datapath.id,
                             stat.match['in_port'], stat.match['eth_dst'],
                             stat.instructions[0].actions[0].port,
                             stat.packet_count, stat.byte_count)

    @set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
    def _port_stats_reply_handler(self, ev):
        body = ev.msg.body

        self.logger.info('datapath         port     '
                         'rx-pkts  rx-bytes rx-error '
                         'tx-pkts  tx-bytes tx-error')
        self.logger.info('---------------- -------- '
                         '-------- -------- -------- '
                         '-------- -------- --------')
        for stat in sorted(body, key=attrgetter('port_no')):
            self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
                             ev.msg.datapath.id, stat.port_no,
                             stat.rx_packets, stat.rx_bytes, stat.rx_errors,
                             stat.tx_packets, stat.tx_bytes, stat.tx_errors)

至此,程序编写完成,就可以在终端中运行了。

实验步骤:

首先,打开控制器所在的终端,输入命令ryu-manager –verbose simple_monitor_13.py 执行流量监控程序,如下图

《基于ryu实现网络的流量监控--monitor》

然后,在mininet中模拟一个简单的拓扑,如图所示。

《基于ryu实现网络的流量监控--monitor》

运行拓扑后,用h1 ping h2,接下来观察控制器终端中显示的内容。

《基于ryu实现网络的流量监控--monitor》

 

 

以下是详细代码

from operator import attrgetter

from ryu.app import simple_switch_13
from ryu.controller.handler import set_ev_cls
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.controller import ofp_event
from ryu.lib import hub


class MyMonitor(simple_switch_13.SimpleSwitch13):
    def __init__(self, *args, **kwargs):
        super(MyMonitor, self).__init__(*args, **kwargs)
        self.datapaths = {}
        self.monitor_thread = hub.spawn(self._monitor_send_datapath)

    @set_ev_cls(ofp_event.EventOFPStateChange,
                [MAIN_DISPATCHER, DEAD_DISPATCHER])
    def _state_change_handler(self, ev):
        """方法用于对交换机的状态进行监听,比如上线或者下线
        例如:
        ryu.controller.handler.HANDSHAKE_DISPATCHER     交换 HELLO 讯息
        ryu.controller.handler.CONFIG_DISPATCHER       接收SwitchFeatures讯息
        ryu.controller.handler.MAIN_DISPATCHER    一般状态
        ryu.controller.handler.DEAD_DISPATCHER    联机中断"""
        datapath = ev.datapath
        if ev.state == MAIN_DISPATCHER:
            if datapath.id not in self.datapaths:
                self.datapaths[datapath.id] = datapath
        elif ev.state == DEAD_DISPATCHER:
            if datapath.id in self.datapaths:
                del self.datapaths[datapath.id]

    def _monitor_send_datapath(self):
        """周期性的换机发送请求数据
        通过调用_request_status方法"""
        while True:
            for dp in self.datapaths.values():
                self._request_status(dp)
            hub.sleep(10)

    def _request_status(self, datapath):
        """方法用于控制器向交换机发送状态请求信息,
        比如说端口状态信息请求、流表状态信息请求等
        datapath是传递的交换机参数,用于明确向哪一个交换机发送请求信息"""

        """对于方法的实现
        在ofproto_v1_3_parser中有例子进行解释
        Example::

        def send_port_stats_request(self, datapath):
            ofp = datapath.ofproto
            ofp_parser = datapath.ofproto_parser

            req = ofp_parser.OFPPortStatsRequest(datapath, 0, ofp.OFPP_ANY)
            datapath.send_msg(req)"""

        ofproto = datapath.ofproto
        parser = datapath.ofproto_parser

        req = parser.OFPFlowStatsRequest(datapath)
        datapath.send_msg(req)

        req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
        datapath.send_msg(req)

    @set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
    def _flow_status_reply_handler(self, ev):
        """方法用来解析交换机返回的流表的数据,并将其在终端中打印出来"""
        body = ev.msg.body
        self.logger.info('datapath         '
                         'in-port  eth-dst           '
                         'out-port packets  bytes')
        self.logger.info('---------------- '
                         '-------- ----------------- '
                         '-------- -------- --------')
        for stat in sorted([flow for flow in body if flow.priority == 1],
                           key=lambda flow: (flow.match['in_port'],
                                             flow.match['eth_dst'])):
            self.logger.info('%016x %8x %17s %8x %8d %8d',
                             ev.msg.datapath.id,
                             stat.match['in_port'], stat.match['eth_dst'],
                             stat.instructions[0].actions[0].port,
                             stat.packet_count, stat.byte_count)

    @set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
    def _port_status_reply_handler(self, ev):
        """方法用来解析交换机返回的流表的数据,并将其在终端中打印出来"""
        body = ev.msg.body

        self.logger.info('datapath         port     '
                         'rx-pkts  rx-bytes rx-error '
                         'tx-pkts  tx-bytes tx-error')
        self.logger.info('---------------- -------- '
                         '-------- -------- -------- '
                         '-------- -------- --------')
        for stat in sorted(body, key=attrgetter('port_no')):
            self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
                             ev.msg.datapath.id, stat.port_no,
                             stat.rx_packets, stat.rx_bytes, stat.rx_errors,
                             stat.tx_packets, stat.tx_bytes, stat.tx_errors)

github地址:https://github.com/Yang-Jianlin/ryu/blob/master/ryu/app/monitor_yjl.py

 

 

    原文作者:楊木木8023
    原文地址: https://blog.csdn.net/weixin_40042248/article/details/111823005
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞