基于JMS(Java Message Service)的消息中间件----ActiveMQ

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。而Java中的消息中间件则是JMS—Java Message Service.在常见的消息中间件类型ActiveMQ无疑是不错的选择。接下来我们先简单介绍一下什么是JMS及ActiveMQ而后我们介绍一下ActiveMQ中的两种模式 — 队列模型以及主题模式。

JMS简介

  1. JMS基本概念
    JMS(Java Message Service)是访问企业消息系统的标准API,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。

JMS应用由以下几部分组成:
JMS provider :是一个消息系统,它实现了JMS 接口并提供管理和控制的功能。
JMS clients :是用Java语言写的一些程序和组件,它们产生和使用消息。
Messages :是在JMS clients之间传递的消息的对象。
Administered objects :是由使用JMS clients 的人生成的预选设置好的JMS 对象。有两种这样的对象:
destinations和connection factories。

  1. JMS基本功能
    JMS是用于和面向消息的中间件相互通信的应用程序接口。它既支持点对点(point-to-point)的域,又支持发布/订阅 (publish/subscribe)类型的域,并且提供对下列类型的支持:经认可的消息传递,事务型消息的传递,一致性消息和具有持久性的订阅者支 持。JMS还提供了另一种方式来对您的应用与旧的后台系统相集成。

  2. 消息服务类型
    1) point-to-point (PTP)方式:点到点的模型。消息由一个JMS客户机(发布者)发送到服务器上的一个目的地,即一个队列(queue)。而另一个JMS客户机(订阅者)则可以访问这个队列,并从该服务器获取这条消息。
    2) publish/subscribe (pub/sub)方式:发布-订阅模型。这里仍然是由一个JMS客户机将一条消息发布到服务器上的一个目的地上,但是这次这个目的地叫做一个主题 (topic),可有多个订阅者去访问该消息。消息将一直维持在主题中,直到这个主题的所有订阅者都取走了该消息的一个副本。消息也包括了一个参数,用于 定义了该消息的耐久性(它能够在服务器上等待订阅者多长时间)。

ActiceMQ简介:

    ActiveMQ是Apache所提供的一个开源的消息系统,完全采用Java来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范。JMS是一组Java应用程序接口,它提供消息的创建、发送、读取等一系列服务。JMS提供了一组公共应用程序接口和响应的语法,类似于Java数据库的统一访问接口JDBC,它是一种与厂商无关的API,使得Java程序能够与不同厂商的消息组件很好地进行通信。

JMS支持两种消息发送和接收模型。一种称为P2P(Ponit to Point)模型,即采用点对点的方式发送消息。P2P模型是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输称为可能,P2P模型在点对点的情况下进行消息传递时采用。下面两张图来描述一下这两种模型:
《基于JMS(Java Message Service)的消息中间件----ActiveMQ》
《基于JMS(Java Message Service)的消息中间件----ActiveMQ》

Java实现ActiveMQ两种模型

【1】队列模型

  • 队列模型之生产者端

    package com.imooc.jms.queue;
    import java.awt.font.TextMeasurer;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnectionFactory;

    public class AppProducer {

       private static final String url = "tcp://192.168.133.1:61616";
       private static final String queueName = "queue-test";
       
       public static void main(String[] args) throws JMSException {
           
           //1.创建connectionFactory
           ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
           
           //2.创建connection
           Connection connection = connectionFactory.createConnection();
           
           //3.  启动链接
           connection.start();
           
           //4. 创建会话
           Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
           
           //5.  创建目标
           Destination destination =  session.createQueue(queueName);
           
           //6.  创建生产者向目标发送消息
           MessageProducer  producer=  session.createProducer(destination);
           
           for(int i = 0 ;i<100;i++){
               //7.  创建消息
               TextMessage textMessage = session.createTextMessage("test"+i);
               //8.  发布消息
               producer.send(textMessage);
               System.out.println("发送消息:"+textMessage.getText());
           }    
           //9. 关闭连接
           connection.close();
       }

    }

说明:此为队列模型中的生产者端,下面就该端代码进行说明:
new 一个ConnectionFactory –》 new 一个connection —》启动这个连接—-》创建一个会话—-》创建消息发送目的地—》创建一个消息生产者向目的地发送消息–》发送消息—-》关闭连接

  • 队列模型之消费者端

    package com.imooc.jms.queue;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnectionFactory;

    public class AppConsumer {

       private static final String url = "tcp://192.168.133.1:61616";
       private static final String queueName = "queue-test";
       
       public static void main(String[] args) throws JMSException {
           
           //1.创建connectionFactory
           ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
           
           //2.创建connection
           Connection connection = connectionFactory.createConnection();
           
           //3.  启动链接
           connection.start();
           
           //4. 创建会话
           Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
           
           //5.  创建目标
           Destination destination =  session.createQueue(queueName);
           
          //6.  创建一个消费者
           MessageConsumer consumer =  session.createConsumer(destination);
           
           //7.  创建一个监听器
           consumer.setMessageListener(new MessageListener() {
               
               public void onMessage(Message message) {
                   
                   TextMessage textMessage = (TextMessage)message;
                   try {
                       System.out.println("接受消息:"+textMessage.getText());
                   } catch (JMSException e) {
                       // TODO Auto-generated catch block
                       e.printStackTrace();
                   }
                   
               }
           });
           
           //9. 关闭连接
           //connection.close();
       }

    }

说明:此为队列模型中的消费者端,下面就该端代码进行说明:
new 一个ConnectionFactory –》 new 一个connection —》启动这个连接—-》创建一个会话—-》创建消息发送目的地—》创建一个消息消费者–》创建一个消息监听器监听消息—-》关闭连接

【2】主题模型

  • 主题模型之生产者端

    package com.imooc.jms.topic;
    import java.awt.font.TextMeasurer;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnectionFactory;

    public class AppProducer {

       private static final String url = "tcp://192.168.133.1:61616";
       private static final String topicName = "topic-test";
       
       public static void main(String[] args) throws JMSException {
           
           //1.创建connectionFactory
           ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
           
           //2.创建connection
           Connection connection = connectionFactory.createConnection();
           
           //3.  启动链接
           connection.start();
           
           //4. 创建会话
           Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
           
           //5.  创建目标
           Destination destination =  session.createTopic(topicName);
           
           //6.  创建生产者向目标发送消息
           MessageProducer  producer=  session.createProducer(destination);
           
           for(int i = 0 ;i<100;i++){
               //7.  创建消息
               TextMessage textMessage = session.createTextMessage("test"+i);
               //8.  发布消息
               producer.send(textMessage);
               System.out.println("发送消息:"+textMessage.getText());
           }
           
           //9. 关闭连接
           connection.close();
       }

    }

  • 主题模型之消费者端

    package com.imooc.jms.topic;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnectionFactory;

    public class AppConsumer {

       private static final String url = "tcp://192.168.133.1:61616";
       private static final String topicName = "topic-test";
       
       public static void main(String[] args) throws JMSException {
           
           //1.创建connectionFactory
           ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
           
           //2.创建connection
           Connection connection = connectionFactory.createConnection();
           
           //3.  启动链接
           connection.start();
           
           //4. 创建会话
           Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
           
           //5.  创建目标
           Destination destination =  session.createTopic(topicName);
           
          //6.  创建一个消费者
           MessageConsumer consumer =  session.createConsumer(destination);
           
           //7.  创建一个监听器
           consumer.setMessageListener(new MessageListener() {
               
               public void onMessage(Message message) {
                   
                   TextMessage textMessage = (TextMessage)message;
                   try {
                       System.out.println("接受消息:"+textMessage.getText());
                   } catch (JMSException e) {
                       // TODO Auto-generated catch block
                       e.printStackTrace();
                   }
                   
               }
           });
           
           //9. 关闭连接
           //connection.close();
       }

    }

最后创建项目源码地址:
链接:http://pan.baidu.com/s/1bo1PSib 密码:khyc

    原文作者:james
    原文地址: https://segmentfault.com/a/1190000012096924
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞