程序均为ryubook的案例,在此记录一下我对程序的理解和实验的执行步骤。如系统学习,可以下载ryubook进行学习,此外可以参考https://ryu.readthedocs.io/en/latest/getting_started.html,这是官方的网站,里面详细阐述了各种功能和开发步骤。
背景:
流量监控,即针对交换机加入流量监控的功能,包括监控错包,发送包的数量等一系列的操作。因为,网络已经成为许多服务或业务的基础建设,所以维护一个稳定的网络环境是必要的。但是网络问题总是不断地发生。网络发生异常的时候,必须快速的找到原因,并且尽速恢复原状。因此,为了网络的安全以及业务的正常运作,持续注意网络的健康状况是最基本的工作。
原理:
对于流量监控,首先,需要实现二层交换机的基本功能。然后,需要对交换机的状态进行监控,即交换机的上线和下线情况;需要对交换机进行数据请求,包括请求端口信息和流表信息,由于流量监控是持续性的对网络设备进行监控,所有需要开启一个线程持续对交换机发送请求信息;向交换机发送请求信息后,交换机就会回送请求报文,也就是会向控制器发送端口和流表的信息,此时就需要对数据进行解析并显示出来。这样就实现了流量监控的基本功能,工作流程如下图。
代码编写:
首先,编写类SimpleMonitor(),进行初始化,定义一个datapaths字典。
class SimpleMonitor( simple_switch_13 . SimpleSwitch13 ):
def __init__(self, *args, **kwargs):
super(SimpleMonitor , self).__init__(*args, **kwargs)
self.datapaths = {}
然后编写监听交换机状态的方法,判断交换机是否在线,然后再将交换机写入字典当中。
@set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER, DEAD_DISPATCHER])
#对事件进行监听,MAIN_DISPATCHER, DEAD_DISPATCHER主要用于判断交换机状态
def _state_change_handler(self, ev):
datapath = ev.datapath
if ev.state == MAIN_DISPATCHER:
if datapath.id not in self.datapaths:
self.logger.debug('register datapath: %016x', datapath.id)
self.datapaths[datapath.id] = datapath
elif ev.state == DEAD_DISPATCHER:
if datapath.id in self.datapaths:
self.logger.debug('unregister datapath: %016x', datapath.id)
del self.datapaths[datapath.id]
然后编写请求方法,实现控制器对交换机进行请求,包括请求流表和端口的信息。对于这个方法的编写,先对信息进行解析,即对版本信息和parser进行解析,然后调用ofproto_v1_3_parser里面的OFPFlowStatsRequest()和OFPPortStatsRequest()对流表和端口进行下发请求操作。
def _request_stats(self, datapath):
self.logger.debug('send stats request: %016x', datapath.id)
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
req = parser.OFPFlowStatsRequest(datapath)
datapath.send_msg(req)
req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
datapath.send_msg(req)
再编写方法,周期性的向交换机发送信息。
def _monitor(self):
while True:
for dp in self.datapaths.values():
self._request_stats(dp)
hub.sleep(10) #每隔10秒发送一次请求数据
最后,是对收到的端口信息和流表信息进行解析并显示在终端设备中,解析数据的数据格式在ofproto_v1_3_parser中写的有(因为用的是1.3版本的)。例如下面这一段就是ofproto_v1_3_parser中给出的例子。
Example::
@set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
def port_stats_reply_handler(self, ev):
ports = []
for stat in ev.msg.body:
ports.append('port_no=%d '
'rx_packets=%d tx_packets=%d '
'rx_bytes=%d tx_bytes=%d '
'rx_dropped=%d tx_dropped=%d '
'rx_errors=%d tx_errors=%d '
'rx_frame_err=%d rx_over_err=%d rx_crc_err=%d '
'collisions=%d duration_sec=%d duration_nsec=%d' %
(stat.port_no,
stat.rx_packets, stat.tx_packets,
stat.rx_bytes, stat.tx_bytes,
stat.rx_dropped, stat.tx_dropped,
stat.rx_errors, stat.tx_errors,
stat.rx_frame_err, stat.rx_over_err,
stat.rx_crc_err, stat.collisions,
stat.duration_sec, stat.duration_nsec))
self.logger.debug('PortStats: %s', ports)
"""
代码如下
@set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
def _flow_stats_reply_handler(self, ev):
body = ev.msg.body
self.logger.info('datapath '
'in-port eth-dst '
'out-port packets bytes')
self.logger.info('---------------- '
'-------- ----------------- '
'-------- -------- --------')
for stat in sorted([flow for flow in body if flow.priority == 1],
key=lambda flow: (flow.match['in_port'],
flow.match['eth_dst'])):
self.logger.info('%016x %8x %17s %8x %8d %8d',
ev.msg.datapath.id,
stat.match['in_port'], stat.match['eth_dst'],
stat.instructions[0].actions[0].port,
stat.packet_count, stat.byte_count)
@set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
def _port_stats_reply_handler(self, ev):
body = ev.msg.body
self.logger.info('datapath port '
'rx-pkts rx-bytes rx-error '
'tx-pkts tx-bytes tx-error')
self.logger.info('---------------- -------- '
'-------- -------- -------- '
'-------- -------- --------')
for stat in sorted(body, key=attrgetter('port_no')):
self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
ev.msg.datapath.id, stat.port_no,
stat.rx_packets, stat.rx_bytes, stat.rx_errors,
stat.tx_packets, stat.tx_bytes, stat.tx_errors)
至此,程序编写完成,就可以在终端中运行了。
实验步骤:
首先,打开控制器所在的终端,输入命令ryu-manager –verbose simple_monitor_13.py 执行流量监控程序,如下图
然后,在mininet中模拟一个简单的拓扑,如图所示。
运行拓扑后,用h1 ping h2,接下来观察控制器终端中显示的内容。
以下是详细代码
from operator import attrgetter
from ryu.app import simple_switch_13
from ryu.controller.handler import set_ev_cls
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.controller import ofp_event
from ryu.lib import hub
class MyMonitor(simple_switch_13.SimpleSwitch13):
def __init__(self, *args, **kwargs):
super(MyMonitor, self).__init__(*args, **kwargs)
self.datapaths = {}
self.monitor_thread = hub.spawn(self._monitor_send_datapath)
@set_ev_cls(ofp_event.EventOFPStateChange,
[MAIN_DISPATCHER, DEAD_DISPATCHER])
def _state_change_handler(self, ev):
"""方法用于对交换机的状态进行监听,比如上线或者下线
例如:
ryu.controller.handler.HANDSHAKE_DISPATCHER 交换 HELLO 讯息
ryu.controller.handler.CONFIG_DISPATCHER 接收SwitchFeatures讯息
ryu.controller.handler.MAIN_DISPATCHER 一般状态
ryu.controller.handler.DEAD_DISPATCHER 联机中断"""
datapath = ev.datapath
if ev.state == MAIN_DISPATCHER:
if datapath.id not in self.datapaths:
self.datapaths[datapath.id] = datapath
elif ev.state == DEAD_DISPATCHER:
if datapath.id in self.datapaths:
del self.datapaths[datapath.id]
def _monitor_send_datapath(self):
"""周期性的换机发送请求数据
通过调用_request_status方法"""
while True:
for dp in self.datapaths.values():
self._request_status(dp)
hub.sleep(10)
def _request_status(self, datapath):
"""方法用于控制器向交换机发送状态请求信息,
比如说端口状态信息请求、流表状态信息请求等
datapath是传递的交换机参数,用于明确向哪一个交换机发送请求信息"""
"""对于方法的实现
在ofproto_v1_3_parser中有例子进行解释
Example::
def send_port_stats_request(self, datapath):
ofp = datapath.ofproto
ofp_parser = datapath.ofproto_parser
req = ofp_parser.OFPPortStatsRequest(datapath, 0, ofp.OFPP_ANY)
datapath.send_msg(req)"""
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
req = parser.OFPFlowStatsRequest(datapath)
datapath.send_msg(req)
req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
datapath.send_msg(req)
@set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
def _flow_status_reply_handler(self, ev):
"""方法用来解析交换机返回的流表的数据,并将其在终端中打印出来"""
body = ev.msg.body
self.logger.info('datapath '
'in-port eth-dst '
'out-port packets bytes')
self.logger.info('---------------- '
'-------- ----------------- '
'-------- -------- --------')
for stat in sorted([flow for flow in body if flow.priority == 1],
key=lambda flow: (flow.match['in_port'],
flow.match['eth_dst'])):
self.logger.info('%016x %8x %17s %8x %8d %8d',
ev.msg.datapath.id,
stat.match['in_port'], stat.match['eth_dst'],
stat.instructions[0].actions[0].port,
stat.packet_count, stat.byte_count)
@set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
def _port_status_reply_handler(self, ev):
"""方法用来解析交换机返回的流表的数据,并将其在终端中打印出来"""
body = ev.msg.body
self.logger.info('datapath port '
'rx-pkts rx-bytes rx-error '
'tx-pkts tx-bytes tx-error')
self.logger.info('---------------- -------- '
'-------- -------- -------- '
'-------- -------- --------')
for stat in sorted(body, key=attrgetter('port_no')):
self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
ev.msg.datapath.id, stat.port_no,
stat.rx_packets, stat.rx_bytes, stat.rx_errors,
stat.tx_packets, stat.tx_bytes, stat.tx_errors)
github地址:https://github.com/Yang-Jianlin/ryu/blob/master/ryu/app/monitor_yjl.py