目录
手动创建topic
topic是broker中的一个个分组,如果设置topic自动创建的话如果创建多了,可能会影响电脑的运行速度,所以我们来手动创建。
首先我们先在Demo项目中新建一个TopicDemo类,关于Demo项目的搭建在这篇博客中有提到
https://blog.csdn.net/Delicious_Life/article/details/104969043
package cn.itcast.rocketmq.topic;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
public class TopicDemo {
public static void main(String[] args) throws Exception {
//分组名haoke这个可以任意设置
DefaultMQProducer producer = new DefaultMQProducer("haoke");
//设置nameserver的地址
producer.setNamesrvAddr("192.168.62.132:9876");
//启动生产者
producer.start();
/**
* 创建topic,参数分别是:borker的名称,topic的名称,queue的数量
* broker要和虚拟机broker.conf配置文件中brokername的名字一致
* newTopic的名字随便起,queueNum8的意思是新建的消息队列数为8个
*/
producer.createTopic("broker_haoke_im","my-topic",8);
System.out.println("topic创建成功!");
producer.shutdown();
}
}
然后我们启动这个测试类,我们看到它提示topic创建成功
这时候我们再打开rocket-console看看能不能找到这个topic
通过docker启动rocket-console
docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.62.132:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8082:8080 -t styletang/rocketmq-console-ng:1.0.0
输入访问地址http://192.168.62.132:8082/#/
可以看到在主题的最下面有了我们的新topic
发送消息(同步)
发送消息的数据结构如下
我们新建一个类测试同步发送消息
package cn.itcast.rocketmq.sendmsg;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class SyncProducer {
public static void main(String[] args) throws Exception {
//分组名haoke这个可以任意设置
DefaultMQProducer producer = new DefaultMQProducer("haoke");
//设置nameserver的地址
producer.setNamesrvAddr("192.168.62.132:9876");
//启动生产者
producer.start();
//发送消息
String msg = "我的第一个消息!";
Message message = new Message("my-topic", "mytag", msg.getBytes("UTF-8"));
SendResult sendResult = producer.send(message);
System.out.println("消息id: " + sendResult.getMsgId());
System.out.println("消息队列: " + sendResult.getMessageQueue());
System.out.println("消息offset值: " + sendResult.getQueueOffset());
System.out.println(sendResult);
producer.shutdown();
}
}
运行测试类,可以看到console输出queueId=5
我们在RocketMq-console中点击“mytopic”中的状态一栏,可以发现第五个队列更新了消息记录。
我们也可以根据控制台输出的msgId查询消息
可以看到消息的详细内容如下
发送消息(异步)
注意异步消息不能直接shutdown,得等到消息完成后才能shutdown,否则会报错。
package cn.itcast.rocketmq.sendmsg;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//分组名haoke这个可以任意设置
DefaultMQProducer producer = new DefaultMQProducer("haoke");
//设置nameserver的地址
producer.setNamesrvAddr("192.168.62.132:9876");
//启动生产者
producer.start();
// 发送消息
String msg = "我的第一个异步发送消息!";
Message message = new Message("my-topic", msg.getBytes("UTF-8"));
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功了!" + sendResult);
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息队列:" + sendResult.getMessageQueue());
System.out.println("消息offset值:" + sendResult.getQueueOffset());
}
@Override
public void onException(Throwable e) {
System.out.println("消息发送失败!" + e);
}
});
// producer.shutdown();
}
}
消息发送成功了
我们再用Id在rocketmq-console中查看下