一、简介

1、简介

远程过程调用(RPC)系统,包括Java RMI,是同步的–调用者必须阻塞和等待,直到被调用的方法完成执行;因此如果不使用多个线程,就无法开发松耦合的企业应用程序;也就是说,RPC系统要求客户机和服务器同时可用;但在某些应用程序中这种紧耦合是不可能。面向消息的中间件系统为这些问题提供了解决方案:它们基于异步交互模型,并提供可以通过网络访问的消息队列的抽象,这些消息包含描述特定业务操作的格式化数据。

JMS,即Java Messaging Services(Java 消息服务),用来在特定系统内的多个单元和应用程序之间建立一个成功通信的网络。通常,它采用应用程序接口的形式,使用队列路径将消息从一个应用程序传递到另一个应用程序。

JMS旨在使开发异步处理(发送/接收)业务数据和事件的业务应用程序变得容易:它定义了一个通用的企业消息传递API,可以被各种企业消息中间件轻松有效地支持。

2、消息传递模型

JMS支持两种消息传递模型(Message Delivery Models): 点对点(Point to Point)模式和发布/订阅(Publish/Subscribe)模式。

  • Point-to-Point (Queue destination)

    在此模型中,消息从生产者传递到一个消费者;消息被传递到一个队列目的地(消息队列),然后再传递到在该队列中注册的消费者之一。

    虽然任意数量的生产者都可以向队列发送消息,但是每个消息都保证由一个消费者接收并使用。

    如果没有注册的消费者来消费消息,队列将保存消息,直到消费者注册并消费消息为止。

  • Publish/Subscribe (Topic destination)

    在这个模型中,消息从生产者发送到任意数量的消费者;消息被传递到主题目的地,然后传递给订阅了此主题的活动状态的消费者。

    任意数量的生产者都可以向主题目的地发送消息,每个消息都可以传递给任意数量的订阅者。与队列方式不同,如果没有已注册的消费者,则主题目的地将不包含消息,除非它为非活动的消费者提供了持久订阅。持久订阅表示在主题目的地注册的消费者在消息发送到主题时可以处于非活动状态。

3、可靠的消息传递

JMS定义了两种传递模式:

  • 持久消息

    保证消息被成功消费一次且只消费一次。消息不会丢失。

  • 非持久性消息

    保证最多传递一次。

消息的可靠性完全取决于对性能的取舍:消息传递的可靠性越高,实现可靠性所需的带宽和开销就越大;可以通过生成非持久性消息来最大化性能,也可以通过生成持久性消息来最大化可靠性。

4、优点

JMS有以下优点:

  • 它有一个异步消息队列,实现简单可靠。

  • 组件之间的通信是松耦合的。

  • 使具有不同消息提供程序的两个应用程序之间的通信成为可能。

  • 消息可以通过点对点发送或者广播到多个接收者,如果需要也可以将两种方式结合起来使用。

  • 只有当消息被接收方接收并确认时,消息才会从队列中删除。

  • 由于有有效的负载平衡,因此它有更大的吞吐量。

  • 由于是异步的,消息的发送频率通常比指定的时间更快,因此可以创建许多消息并通过网络再次发送,从而提高了效率。

二、架构

1、JMS程序组成

一个JMS应用程序由以下部分组成:

  • JMS Provider

JMS提供者:实现JMS规范的消息传递系统。

  • JMS Clients

JMS客户端:发送和接收消息的Java应用程序。

  • Messages

消息:用于在JMS客户端之间传递信息的对象。

  • Administered Objects

受管对象: 由管理员为使用JMS客户端而创建的预配置JMS对象,由目标点(队列和主题)和连接工厂组成。

2、JMS编程模型

JMS应用程序由一组应用程序定义的消息和一组交换消息的客户端组成,JMS客户端通过使用JMS API发送和接收消息进行交互;消息由消息头(header)、属性(properties)和消息体(body)三部分组成。

  • 消息头

每条消息都需要消息头,包含用于路由和标识消息的信息。其中一些字段由JMS提供者在生成和传递消息期间自动设置,其他字段由客户端根据消息设置。

  • 属性(可选)

提供客户端可用于筛选消息的值,一般是有关数据的附加信息,例如创建数据的进程、创建数据的时间等。可以将属性看作是消息头的扩展,并由属性名/值对组成。

  • 消息体(可选)

    包含要交换的实际数据;JMS规范定义了JMS提供者必须支持的六种类型的消息:

    • Message

      表示没有消息正文的消息。

    • StreamMessage

      消息的主体包含Java基本类型流,是按顺序编写和读取的。

    • MapMessage

      消息的主体包含一组名称/值对,没有定义条目的顺序。

    • TextMessage

      消息的主体包含Java字符串的消息,例如XML消息。

    • ObjectMessage

      消息的主体包含序列化的Java对象。

    • BytesMessage

      消息的主体包含未解释字节流的消息。

三、JMS编程

1、Open MQ

Open MQ实现了一种可靠的消息传递机制,可以不依赖同步通信将应用程序集成在一起,并在消息生产者和消息消费者之间提供缓冲。它实现了Java 消息服务(Java Message Service,JMS) API,并提供了企业级特性。

  • 下载

可以直接在官网下载Open MQ文件镜像或安装文件。

  • 安装

根据下载的安装文件直接解压或运行安装文件安装,此处使用镜像直接解压。

  • 启动

在安装目录的/mq/bin目录下,执行:

imqbrokerd -tty

启动成功会看到显示以下输出:

代理 "imqbroker@acer:7676" 就绪
  • 测试

创建pass文件,例如创建mq/config/pass,内容如下:

imq.imqcmd.password=admin

在安装目录的/mq/bin目录下,执行:

imqcmd query bkr -u admin -passfile ../config/pass

将会看到显示以下输出:

查询指定的代理:

-----------------
主机           主端口
-----------------
localhost    7676

版本                  4.5.2
实例名                 imqbroker
代理 ID
主端口                 7676
已嵌入代理               false
实例配置/数据根目录          D:\Program Files\openmq4_5_2\var\mq

系统中当前的消息数           0
系统中当前的消息大小(字节)      0

停用消息队列中的当前消息数量      0
停用消息队列中的当前消息字节总数    0

记录停用消息              false
截断停用消息队列中的消息主体      false

系统中的最大消息数           无限制 (-1)
系统中的最大消息大小          无限制 (-1)
最大消息大小              70m

自动创建队列              true
自动创建主题              true
自动创建队列的最大活动使用方数     无限制 (-1)
自动创建队列的最大备份使用方数     0

群集 ID
群集具有高可用性            false
群集代理列表 (处于活动状态)     mq://192.168.1.3:7676/
群集代理列表 (处于已配置状态)
群集主代理
群集 URL

日志等级                INFO
日志清空并重新记录间隔 (秒)     604800
日志清空并重新记录大小(字节)     268435456

成功查询代理。

2、依赖

编写程序时需要依赖javax.jms-apiimq,对应的Maven配置如下:

  • jms-api
<!-- https://mvnrepository.com/artifact/javax.jms/javax.jms-api -->
<dependency>
    <groupId>javax.jms</groupId>
    <artifactId>javax.jms-api</artifactId>
    <version>2.0.1</version>
</dependency>
  • img
<!-- https://mvnrepository.com/artifact/com.sun.messaging.mq/imq -->
<dependency>
    <groupId>com.sun.messaging.mq</groupId>
    <artifactId>imq</artifactId>
    <version>4.6-b01</version>
</dependency>

3、生产者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

public class Producer {

	public static void main(String[] args) throws Exception{
		//1、创建连接工厂
		ConnectionFactory connFactory = new com.sun.messaging.ConnectionFactory();
		Connection connection = null;
		try {
			//2、创建连接
			connection = connFactory.createConnection();
			//3、创建会话,其中第一个参数false表示没有进行事务处理,第二个参数表示会话将在成功接收到消息时自动确认消息
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			//4、创建目标对象:点对点消息传递中,目的地称为队列,在消息传递的发布/订阅模型中,称为主题
			Queue queue = new com.sun.messaging.Queue("world");
			//5、使用Session和Destination对象来创建消息生产者
			MessageProducer producer = session.createProducer(queue);
			//6、创建消息
			TextMessage message = session.createTextMessage();
			message.setText("Hello World!");
			//7、发送消息
			producer.send(message);
			System.out.println("Send Message...");
		} catch (JMSException e) {
			e.printStackTrace();
		} finally{
			if(connection != null){
				//必须关闭所有已创建的连接
				connection.close();
			}
		}
	}
}

4、消费者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

public class Consumer {

	public static void main(String[] args) throws Exception{
		//1、创建连接工厂
		ConnectionFactory connFactory = new com.sun.messaging.ConnectionFactory();
		Connection connection = null;
		try {
			//2、创建连接
			connection = connFactory.createConnection();
			//3、创建会话,其中第一个参数false表示没有进行事务处理,第二个参数表示会话将在成功接收到消息时自动确认消息
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			//4、创建目标对象:点对点消息传递中,目的地称为队列,在消息传递的发布/订阅模型中,称为主题
			Queue queue = new com.sun.messaging.Queue("world");
			//5、使用Session和Destination对象来创建消息消费者
			MessageConsumer consumer = session.createConsumer(queue);
			//启动连接
			connection.start();
			//接收消息
			Message msg = consumer.receive();
			if(msg instanceof TextMessage){
				TextMessage textMsg = (TextMessage) msg;
				System.out.printf("Received the message: %s", textMsg.getText());
			}
		} catch (JMSException e) {
			e.printStackTrace();
		} finally{
			if(connection != null){
				//必须关闭所有已创建的连接
				connection.close();
			}
		}
	}
}

JMS客户端可以以同步或异步方式消费消息:

  • 同步

    此模式下,客户端调用MessageConsumer对象的receive()方法,应用程序线程阻塞,直到方法返回;这样做的结果是,如果消息不可用,它将阻塞,直到消息可用或receive()方法超时(通过传参指定超时时间)。

  • 异步

    此模式下,客户端需要向MessageConsumer注册一个MessageListener监听,这类似回调,在session(会话)调用onMessage()方法时,客户端将消费一条消息。换句话说,应用程序的线程不会阻塞。

    对应的异步程序代码如下:

      ...
      //接收消息
      MessageListener listener = new MyListener();
      consumer.setMessageListener(listener);
      ...
    
      import javax.jms.JMSException;
      import javax.jms.Message;
      import javax.jms.MessageListener;
      import javax.jms.TextMessage;
    
      public class MyListener implements MessageListener{
    
          @Override
          public void onMessage(Message msg) {
              if(msg instanceof TextMessage){
                  TextMessage textMsg = (TextMessage) msg;
                  try {
                      System.out.printf("Received the message: %s", textMsg.getText());
                  } catch (JMSException e) {
                      e.printStackTrace();
                  }
              }
          }
      }
    

5、运行

启动生产者:

Send Message...

启动消费者:

Received the message: Hello World!
参考资料: