RocketMQ之手动创建Topic、发送消息(同步&异步)

目录

手动创建topic

发送消息(同步)

发送消息(异步)

 

手动创建topic

 

topic是broker中的一个个分组,如果设置topic自动创建的话如果创建多了,可能会影响电脑的运行速度,所以我们来手动创建。

首先我们先在Demo项目中新建一个TopicDemo类,关于Demo项目的搭建在这篇博客中有提到

https://blog.csdn.net/Delicious_Life/article/details/104969043

package cn.itcast.rocketmq.topic;


import org.apache.rocketmq.client.producer.DefaultMQProducer;

public class TopicDemo {
    public static void main(String[] args) throws Exception {
        //分组名haoke这个可以任意设置
        DefaultMQProducer producer = new DefaultMQProducer("haoke");

        //设置nameserver的地址
        producer.setNamesrvAddr("192.168.62.132:9876");

        //启动生产者
        producer.start();

        /**
         * 创建topic,参数分别是:borker的名称,topic的名称,queue的数量
         * broker要和虚拟机broker.conf配置文件中brokername的名字一致
         * newTopic的名字随便起,queueNum8的意思是新建的消息队列数为8个
         */
        producer.createTopic("broker_haoke_im","my-topic",8);
        System.out.println("topic创建成功!");
        producer.shutdown();
    }
}

 

然后我们启动这个测试类,我们看到它提示topic创建成功

《RocketMQ之手动创建Topic、发送消息(同步&异步)》

这时候我们再打开rocket-console看看能不能找到这个topic

通过docker启动rocket-console

docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.62.132:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8082:8080 -t styletang/rocketmq-console-ng:1.0.0

输入访问地址http://192.168.62.132:8082/#/

可以看到在主题的最下面有了我们的新topic

《RocketMQ之手动创建Topic、发送消息(同步&异步)》

 

 

发送消息(同步)

 

发送消息的数据结构如下

《RocketMQ之手动创建Topic、发送消息(同步&异步)》

 

我们新建一个类测试同步发送消息

package cn.itcast.rocketmq.sendmsg;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        //分组名haoke这个可以任意设置
        DefaultMQProducer producer = new DefaultMQProducer("haoke");

        //设置nameserver的地址
        producer.setNamesrvAddr("192.168.62.132:9876");

        //启动生产者
        producer.start();

        //发送消息
        String msg = "我的第一个消息!";
        Message message = new Message("my-topic", "mytag", msg.getBytes("UTF-8"));
        SendResult sendResult = producer.send(message);
        System.out.println("消息id: " + sendResult.getMsgId());
        System.out.println("消息队列: " + sendResult.getMessageQueue());
        System.out.println("消息offset值: " + sendResult.getQueueOffset());
        System.out.println(sendResult);
        producer.shutdown();
    }
}

运行测试类,可以看到console输出queueId=5

《RocketMQ之手动创建Topic、发送消息(同步&异步)》

我们在RocketMq-console中点击“mytopic”中的状态一栏,可以发现第五个队列更新了消息记录。

《RocketMQ之手动创建Topic、发送消息(同步&异步)》

我们也可以根据控制台输出的msgId查询消息

《RocketMQ之手动创建Topic、发送消息(同步&异步)》

可以看到消息的详细内容如下

《RocketMQ之手动创建Topic、发送消息(同步&异步)》

 

 

 发送消息(异步)

 

注意异步消息不能直接shutdown,得等到消息完成后才能shutdown,否则会报错。

package cn.itcast.rocketmq.sendmsg;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class AsyncProducer {

    public static void main(String[] args) throws Exception {
        //分组名haoke这个可以任意设置
        DefaultMQProducer producer = new DefaultMQProducer("haoke");

        //设置nameserver的地址
        producer.setNamesrvAddr("192.168.62.132:9876");

        //启动生产者
        producer.start();

        // 发送消息
        String msg = "我的第一个异步发送消息!";
        Message message = new Message("my-topic", msg.getBytes("UTF-8"));
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送成功了!" + sendResult);
                System.out.println("消息id:" + sendResult.getMsgId());
                System.out.println("消息队列:" + sendResult.getMessageQueue());
                System.out.println("消息offset值:" + sendResult.getQueueOffset());
            }

            @Override
            public void onException(Throwable e) {
                System.out.println("消息发送失败!" + e);
            }
        });

//        producer.shutdown();
    }
}

消息发送成功了

《RocketMQ之手动创建Topic、发送消息(同步&异步)》

我们再用Id在rocketmq-console中查看下

《RocketMQ之手动创建Topic、发送消息(同步&异步)》

 

    原文作者:我能在河边钓一整天的鱼
    原文地址: https://blog.csdn.net/Delicious_Life/article/details/104998043
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞