Topic Subsriber模式
订阅模式分为非持久订阅(Non-Durable Topic Subscribers)和持久订阅模式(Durable Topic Subscribers)
非持久订阅(Non-Durable Topic Subscribers)
- 生产者生产消息,谁订阅,谁就会收到
- 生产者生产消息,没有人订阅,消息废弃,当consumer启动连接时,废弃的消息不会再次被收到
代码如下:
package com.pgy.jms.sub;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Date: Created in 27/12/2017 1:01 PM
* @Author: pengganyu
* @Desc:
*/
public class Producer {
public static void main(String[] args) throws JMSException, InterruptedException {
ConnectionFactory factory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD,
ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("hello");
MessageProducer producer = session.createProducer(topic);
int i = 0;
while (true) {
Thread.sleep(3000);
producer.send(session.createTextMessage("hello" + i++));
}
}
}
package com.pgy.jms.sub;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Date: Created in 27/12/2017 2:19 PM
* @Author: pengganyu
* @Desc:
*/
public class Cousmer {
public static void main(String[] args) throws JMSException {
TopicConnectionFactory factory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD,
ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
Connection connection = factory.createTopicConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("hello");
MessageConsumer consumer = session.createConsumer(topic);
// consumer.setMessageListener(message -> System.out.println(message));
while (true) {
TextMessage textMessage = (TextMessage) consumer.receive();
System.out.println(textMessage.getText());
}
}
}
说明
- 生产者在不停地生产消息,如果消费者不启动,消费者是无法接收到消息的;(运行Producer)
- 消费者启动连接后,之前的消息是不会被接收到,启动后才可以接收到当前生产出来的消息(运行Consumer)
- 消费者断开连接后,无法接收到生产者生产的消息(停止consumer)
- 消费者再次连接后,之前丢失的消息无法继续再收到,只能接收到当前生产出来的消息(启动consumer)
持久订阅(Durable Topic Subscribers)
生产者的代码与非持久的相同,consumer的代码如下
package com.pgy.jms.sub;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Date: Created in 27/12/2017 2:19 PM
* @Author: pengganyu
* @Desc:
*/
public class Cousmer1 {
public static void main(String[] args) throws JMSException {
TopicConnectionFactory factory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD,
ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
Connection connection = factory.createTopicConnection();
connection.setClientID("consumer1");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("hello");
MessageConsumer consumer = session.createDurableSubscriber(topic, "consumer1");
// consumer.setMessageListener(message -> System.out.println(message));
while (true) {
TextMessage textMessage = (TextMessage) consumer.receive();
System.out.println(textMessage.getText());
}
}
}
package com.pgy.jms.sub;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Date: Created in 27/12/2017 2:19 PM
* @Author: pengganyu
* @Desc:
*/
public class Cousmer2 {
public static void main(String[] args) throws JMSException {
TopicConnectionFactory factory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD,
ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
Connection connection = factory.createTopicConnection();
// 与非持久订阅相比,需要设置ClientID
connection.setClientID("consumer2");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("hello");
// 与非持久订阅相比,需要用持久订阅方法去创建消费者
MessageConsumer consumer = session.createDurableSubscriber(topic, "consumer2");
while (true) {
TextMessage textMessage = (TextMessage) consumer.receive();
System.out.println(textMessage.getText());
}
}
}
说明
- 生产者在不停地生产消息,此时若没有人订阅,消息直接废弃(启动Producer)
- 消费者1启动,无法接收到之前Producer生产的消息,只能接收到当前的消息(启动Consumer1)
- 消费者2启动,也无法接收之前Producer生产的消息,只能接收到当前的消息(启动Consumer2)
- 中断消费者2,消费者1继续接收消息,消费者2无法接收消息(停止Consumer2)
- 启动消费者2,消费者1继续接收消息,消费者2可以接收到之前停止后丢失的消息,并可以继续接收当前消息(启动Consumer2)
- ActiveMQ是通过ClientID判断消息是否已经发给连接点,若消费者的ClientID相同,那么只会被某一个消费者接收到消息,而另外一个会报错
- 与非持久订阅模式的区别仅为设置了ClinetID及创建消费者使用createDurableSubscriber方法