JMS消息事务-爱代码爱编程
消息事务
消息事务,是保证消息传递原子性的一个重要特征,和JDBC的事务特征类似。
一个事务性发送,其中一组消息要么能够全部保证到达服务器,要么都不到达服务器。
生产者、消费者与消息服务器直接都支持事务性;
ActionMQ的事务主要偏向在生产者的应用。
一、生产者事务:
方式一:
/**
* 加入事务
*/
@Test
public void txSender(){
//获取连接工厂
ConnectionFactory connectionFactory = jmsMessagingTemplate.getConnectionFactory();
Session session = null;
try {
//创建连接
Connection connection = connectionFactory.createConnection();
//参数一:是否启动消息事务
session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//创建生产者
MessageProducer messageProducer = session.createProducer(session.createQueue(name));
for (int i = 0; i < 10; i++) {
//模拟异常
if (i==4){
int a = 10/0;
}
TextMessage textMessage = session.createTextMessage("msg"+i);
messageProducer.send(textMessage);
}
//注意:一旦开启事务发送,那么就必须使用commit方法进行事务提交,否则消息无法到达MQ服务器
session.commit();
} catch (JMSException e) {
e.printStackTrace();
//消息事务回滚
try {
session.rollback();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}
方式二
package com.test.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* 消息发送业务类
*/
@Service
public class MessageService {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Value("${activemq.name}")
private String name;
@Transactional //对消息发送加入事务管理(同时也对JDBC数据库事务生效)
public void sendMsg(){
for (int i = 0; i < 10; i++) {
//模拟异常
if (i==4){
int a = 10/0;
}
jmsMessagingTemplate.convertAndSend(name,"msg --"+i);
}
}
}
@Autowired
private MessageService messageService;
/**
* 事务性发送 方案2: spring的JmsTransactionManager
*/
@Test
public void txSender2(){
messageService.sendMsg();
}
二、消费者事务
@Component //放入IOC容器
public class MsgListener {
/**
* 用于接收消息的方法
* destination:队列的名称或主题的名称
*/
@JmsListener(destination = "${activemq.name}")
public void receiveMsg (Message message, Session session){
if(message instanceof TextMessage){
//接收文本消息
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收消息:"+textMessage.getText());
int i =10/0;
//提交事务
session.commit();
} catch (JMSException e) {
e.printStackTrace();
try {
session.rollback();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}
}
}
消息消费6次失败后,消息被存储在death queue中
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接: https://blog.csdn.net/weixin_43732943/article/details/110847068