基于Flink流处理的动态实时(二)

基于Flink流处理的动态实时(一)的基础上 我写了centos7中安装zookpeer和kafka(单机)的原因是因为不确定公司到底是想用kafka还是rockermq,所以在还没有正式的开发的时候去玩玩kafka。之后决定用rockermq了,所以我接下来写的技术flink和rockermq的一个整合。

首先要了解一下rockermq,初学的话可以看一下初试RocketMQ消息中间件丶一个站在Java后端设计之路的男青年个人博客网站文章,并安装RocketMQ 插件方便省事。

依次运行mqnamesrv.cmd脚本和mqbroker.cmd脚本,因为会出现中文乱码的缘故,我们在rocketmq-console项目的配置文件中添加和配置端口号

server.port=9875
server.tomcat.uri-encoding=UTF-8

并且将namesrvAddr修改成自己的ip地址

rocketmq.config.namesrvAddr=xxx.xxx.xx.xx:9876

然后运行它(随便怎么搞,反正启动就行了。在idea中也行,大jar启动也行)

之后在你游览器输入http://localhost:9875/#/ 可以选择中文还是英文。

《基于Flink流处理的动态实时(二)》
《基于Flink流处理的动态实时(二)》

在然后在你的项目中加入mq的配置

spring.rocketmq.nameServer=xx.xxx.xx.xxx:9876

在创建一个消费者的一个类,测试是否能发送成功

public class TestDome  {
    /**
     * 消费者如需订阅某Topic下特定类型的消息!
     * @param args
     * @throws InterruptedException
     * @throws MQClientException
     */
    public static void main(String [] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("bk_group");//消费者群组
        consumer.setNamesrvAddr ("xx.xxx.xx.xx:9876");//mq的ip地址和端口
        consumer.setInstanceName(UUID.randomUUID().toString());
        // 请明确标明Tag:只关注自己需要的!
        consumer.subscribe("test", "*");
        consumer. registerMessageListener (new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs ,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt messageExt:msgs) {
                    String msg = new String(messageExt.getBody()); //从mq中取得消息
                    System.out.println(Thread.currentThread().getName() + " 接收消息: " + msg);
                }

                MessageExt msg = msgs.get (0);
                /*** 对topic tag验证:只关注特定Pay*/
                if (msg. getTopic (). equals("TopicModel") && msg. getTags (). equals("Pay")) {
                    System.out.print("特定类型:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started");
    }

《基于Flink流处理的动态实时(二)》
《基于Flink流处理的动态实时(二)》 上面的消费者群组,你也可以换的。
《基于Flink流处理的动态实时(二)》
《基于Flink流处理的动态实时(二)》 这是我的主题,你也可以修改的。

修改之后启动他,发送信息

《基于Flink流处理的动态实时(二)》
《基于Flink流处理的动态实时(二)》
《基于Flink流处理的动态实时(二)》
《基于Flink流处理的动态实时(二)》 启动你的main方法之后提交信息,先提交启动都可以,无所谓
《基于Flink流处理的动态实时(二)》
《基于Flink流处理的动态实时(二)》 成功消费,最上面一条是启动直接生产的。

成功搞定,等待下一次flink和rockermq的一个整合,写的不一定特别对,毕竟我也是第一次接触rockermq和flink,如果觉得我写的不对的地方,欢迎留言,互相学习学习。

    原文作者:有个小李子
    原文地址: https://zhuanlan.zhihu.com/p/52215336
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞