storm引擎计算出一批中间告警结果,会发送一条kafka消息给告警入库服务,告警入库服务接收到kafka消息后读取中间告警文件,经过一系列处理后把最终告警存入mysql中。
实际上,中间告警结果可能有重复告警、错误告警、无用告警,告警入库服务会过滤,压缩中间告警,把用户关心的告警存入数据库。过滤的步骤较多,并且客户关心的告警可能会随时变化,写死的告警过滤很快就无法满足应用场景,这种场景下使用过滤器模式则很好满足业务上的不确定性欲扩展性。
告警入库服务涉及消息过滤和告警过滤,下面我们以消息过滤器来讲一下过滤器模式。
1、消息过滤器接口
package com.coshaho.learn.filter; /** * * IMessageFilter.java Create on 2017年5月13日 上午12:43:56 * * 类功能说明: 告警消息过滤器 * * Copyright: Copyright(c) 2013 * Company: COSHAHO * @Version 1.0 * @Author coshaho */ public abstract class IMessageFilter implements Comparable<IMessageFilter> { public int priority() { return 0; } public abstract boolean doFilter(Message msg); public int compareTo(IMessageFilter arg0) { return priority() - arg0.priority(); } }
2、时间过滤器
package com.coshaho.learn.filter; import org.springframework.stereotype.Component; /** * * TimeFilter.java Create on 2017年5月13日 上午12:44:25 * * 类功能说明: 时间过滤 * * Copyright: Copyright(c) 2013 * Company: COSHAHO * @Version 1.0 * @Author coshaho */ @Component public class TimeFilter extends IMessageFilter { public int priority() { return 1; } public boolean doFilter(Message msg) { if(msg.getHour() < 8 || msg.getHour() > 17) { System.out.println("Time filter false, message is " + msg); return false; } System.out.println("Time filter true, message is " + msg); return true; } }
3、级别过滤器
package com.coshaho.learn.filter; import org.springframework.stereotype.Component; /** * * ServerityFilter.java Create on 2017年5月13日 上午12:44:13 * * 类功能说明: 级别过滤 * * Copyright: Copyright(c) 2013 * Company: COSHAHO * @Version 1.0 * @Author coshaho */ @Component public class ServerityFilter extends IMessageFilter { public int priority() { return 2; } public boolean doFilter(Message msg) { if(msg.getSeverity() == 0) { System.out.println("Severity filter false, message is " + msg); return false; } System.out.println("Severity filter true, message is " + msg); return true; } }
4、类型过滤器
package com.coshaho.learn.filter; import org.springframework.stereotype.Component; /** * * TypeFilter.java Create on 2017年5月13日 上午12:44:36 * * 类功能说明: 类型过滤 * * Copyright: Copyright(c) 2013 * Company: COSHAHO * @Version 1.0 * @Author coshaho */ @Component public class TypeFilter extends IMessageFilter{ public int priority() { return 3; } public boolean doFilter(Message msg) { if(msg.getType() < 3) { System.out.println("Type filter false, message is " + msg); return false; } System.out.println("Type filter true, message is " + msg); return false; } }
5、消息监听服务
package com.coshaho.learn.filter; import java.util.Collections; import java.util.List; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.stereotype.Component; /** * * MessageListener.java Create on 2017年5月13日 上午12:44:49 * * 类功能说明: 告警消息监听 * * Copyright: Copyright(c) 2013 * Company: COSHAHO * @Version 1.0 * @Author coshaho */ @Component public class MessageListener { public static void main(String[] args) { @SuppressWarnings("resource") ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring.xml"); MessageListener listener = (MessageListener)context.getBean("messageListener"); Message msg = new Message(); msg.setHour(12); msg.setName("coshaho"); msg.setType(5); msg.setSeverity(3); listener.listen(msg); } public void listen(Message msg) { excute(msg); } private boolean excute(Message msg) { @SuppressWarnings("unchecked") List<IMessageFilter> filters = (List<IMessageFilter>)SpringUtils.getBeansOfType(IMessageFilter.class); Collections.sort(filters); for(IMessageFilter filter : filters) { if(!filter.doFilter(msg)) { return false; } } return true; } }
6、Spring工具类
package com.coshaho.learn.filter; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Service; @Service public class SpringUtils implements ApplicationContextAware { private static ApplicationContext context; public void setApplicationContext(ApplicationContext context) throws BeansException { SpringUtils.context = context; } @SuppressWarnings({ "rawtypes", "unchecked" }) public static List getBeansOfType(Class clazz) { Map map = context.getBeansOfType(clazz); return new ArrayList(map.values()); } }
7、消息类
package com.coshaho.learn.filter; /** * * Message.java Create on 2017年5月13日 上午12:43:37 * * 类功能说明: 告警消息 * * Copyright: Copyright(c) 2013 * Company: COSHAHO * @Version 1.0 * @Author coshaho */ public class Message { private String name; private int hour; private int type; private int severity; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getHour() { return hour; } public void setHour(int hour) { this.hour = hour; } public int getType() { return type; } public void setType(int type) { this.type = type; } public int getSeverity() { return severity; } public void setSeverity(int severity) { this.severity = severity; } @Override public String toString() { return "Message [name=" + name + ", hour=" + hour + ", type=" + type + ", severity=" + severity + "]"; } }
可以看到,过滤器模式可以很方便的扩展过滤业务。