代码编织梦想

【README】

参考 https://blog.csdn.net/u012943767/article/details/79300673

 

【0】声明交换机,队列 与绑定

/**
 * 交换机,队列声明与绑定 
 */
public class AckDeclarer {
	/** 确认交换机 */
	public static final String ACK_EXCHANGE2 = "ACK_EXCHNAGE2";
	/** 确认队列 */
	public static final String ACK_QUEUE2 = "ACK_QUEUE2";
	/** 路由键  */
	public static final String ACK_ROUTE2 = "ACK_ROUTE2";
	
//	四种Exchange 模式
//    direct :需要生产者和消费者绑定相同的Exchange和routing key。
//    fanout:广播模式需要生产者消费者绑定相同的Exchange。
//    topic:支持模糊匹配的广播模式以点分隔,*表示一个单词,#表示任意数量(零个或多个)单词。
//    header:根据生产者和消费者的header中信息进行匹配性能较差 ,x-match [all 匹配所有/any 任意一个]。
	
	public static void main(String[] args) throws Exception {
		/* 获取连接*/
		Connection conn = RBConnectionUtil.getConn();
		// 创建信道 
		Channel channel = conn.createChannel(); 
		/* 声明交换机 */
		channel.exchangeDeclare(ACK_EXCHANGE2, BuiltinExchangeType.FANOUT);
		/* 声明队列 */
		channel.queueDeclare(ACK_QUEUE2, true, false, false, null); 
		System.out.println(String.format("声明交换机【%s】,队列【%s】成功", ACK_EXCHANGE2, ACK_QUEUE2));
		
		/* 把队列绑定到交换机 */
		channel.queueBind(ACK_QUEUE2, ACK_EXCHANGE2, ACK_ROUTE2);
		
		/* 关闭信道和连接 */
		channel.close();
		conn.close(); 
	}
}

【1】生产者

/**
 * 消息确认生产者
 */
public class AckProducer {
	public static void main(String[] args) throws Exception {
		/* 获取连接*/
		Connection conn = RBConnectionUtil.getConn();
		// 创建信道 
		Channel channel = conn.createChannel();
		
		String[] messages = new String[]{
				"first.-04130828"
				, "second..-04130828"
				, "third...-04130828"
				, "fourth....-04130828"
				, "fiveth.....-04130828"
				, "6th.....-04130828"
				, "7th.....-04130828"
				, "8th.....-04130828"
				, "9th.....-04130828"
				, "10th.....-04130828"
			};
		for (String msg : messages) {
			channel.basicPublish(AckDeclarer.ACK_EXCHANGE2, AckDeclarer.ACK_ROUTE2, null, msg.getBytes());
			System.out.println(msg + " is sent"); 
		} 
		System.out.println("消息发送完成"); 
		channel.close(); 
		conn.close();
	}
}

【2】自动确认消费者

/**
 * 自动确认消费者
 */
public class AutoAckConsumer {
	public static void main(String[] args) throws Exception {
		/* 获取连接*/
		Connection conn = RBConnectionUtil.getConn();
		// 创建信道 
		Channel channel = conn.createChannel();
		
		System.out.println("等待消费1");
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body) throws IOException {
				String msg = new String(body, "UTF-8");
				System.out.println("接收到的消息=" + msg); 
				try {
					doWork(msg);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			
		};
		channel.basicConsume(AckDeclarer.ACK_QUEUE2, true, consumer);// 为true自动确认 
	}
	 private static void doWork(String task) throws InterruptedException {
	        for (char ch : task.toCharArray()) {
	            if (ch == '.') Thread.sleep(1000);
	        }
	    }
}

【3】手动确认消费者

/**
 * 手动确认消费者
 */
public class ManualAckConsumer {
	public static void main(String[] args) throws Exception {
		/* 获取连接*/
		Connection conn = RBConnectionUtil.getConn();
		// 创建信道 
		Channel channel = conn.createChannel();
		
		System.out.println("手动确认消费者等待消费1");
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body) throws IOException {
				String msg = new String(body, "UTF-8");
				System.out.println("接收到的消息=" + msg); 
				try {
					doWork(msg);
				} catch (InterruptedException e) {
					e.printStackTrace();
				} finally {
					System.out.println("消费done,手动确认ack,msg=" + msg);
					channel.basicAck(envelope.getDeliveryTag(), false); // 手动确认
				}
			}
		};
		// 手动确认,向rabbitmq 服务器手动发送ack成功消费标识
		channel.basicConsume(AckDeclarer.ACK_QUEUE2, false, consumer);// 为false手动确认 
	}
	 private static void doWork(String task) throws InterruptedException {
	        for (char ch : task.toCharArray()) {
	            if (ch == '.') Thread.sleep(1000);
	        }
	    }
}

手动确认消费者日志 

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
手动确认消费者等待消费1
接收到的消息=first.-04130828
消费done,手动确认ack,msg=first.-04130828
接收到的消息=second..-04130828
消费done,手动确认ack,msg=second..-04130828
接收到的消息=third...-04130828
消费done,手动确认ack,msg=third...-04130828
接收到的消息=fourth....-04130828
消费done,手动确认ack,msg=fourth....-04130828
接收到的消息=fiveth.....-04130828
消费done,手动确认ack,msg=fiveth.....-04130828
接收到的消息=6th.....-04130828
消费done,手动确认ack,msg=6th.....-04130828
接收到的消息=7th.....-04130828
消费done,手动确认ack,msg=7th.....-04130828
接收到的消息=8th.....-04130828
消费done,手动确认ack,msg=8th.....-04130828
接收到的消息=9th.....-04130828
消费done,手动确认ack,msg=9th.....-04130828
接收到的消息=10th.....-04130828
消费done,手动确认ack,msg=10th.....-04130828

 

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/PacosonSWJTU/article/details/115664570

rabbitmq自动及手动ack_m0_38140657的博客-爱代码爱编程

                        rabbitmq自动及手动ACK    mq的ack  主要是确认消息被消费者消费完成后通知服务器将队列里面的消息清除。 而如果不配置Ack的话呢,我测试过他会自动的忽略,也就是说此时的服务是no_ack=true的模式,就是说只要我发现你是消费了这个数据,至于异常不异常的,我不管了。通知Ack机制就是这

rabbitmq(三)手动ack确认-爱代码爱编程

默认情况下 spring-boot-data-amqp 是自动ACK机制,就意味着 MQ 会在消息发送完毕后,自动帮我们去ACK,然后删除消息的信息。 这样依赖就存在这样一个问题: 如果消费者处理消息需要较长时间,最好的做法

springboot集成rabbitmq手动确认消息ack(亲测)_雷军ivan的博客-爱代码爱编程

SpringBoot集成RabbitMq手动确认消息ACK(亲测) Linux部署环境 采用Docker快速部署rabbitMq环境 docker安装 安装yum-utils: sudo yum install

debug方式讲解rabbitmq的自动ack和手动ack_it乾坤的博客-爱代码爱编程

文章首发于我的个人博客,到个人博客体验更佳阅读哦 https://www.itqiankun.com/article/1564534513 介绍Rabbitmq的手动ACK和自动ACK 当消息一旦被消费者接收

SpringCloud RabbitMQ 手动ack-爱代码爱编程

配置文件 rabbitmq: customizeRoutingKey : hrmHomeSite host: 127.0.0.1 port: 5672 listener: simple: acknowledge-mode: manual #手动签收 确认模式 AcknowledgeMode.NONE

SpringBoot+RabbitMQ实现手动Consumer Ack-爱代码爱编程

目录 一、Consumer Ack的三种方式二、进入主题:SpringBoot+RabbitMQ实现手动Consumer Ack1、pom文件中导入依赖坐标2、在生产者与消费者工程yml配置文件中开启手动Ack3、在生产者工程中创建一个配置类声明队列与交换机的关系4、在消费者工程中创建一个组件监听在生产者声明的队列5、在生产者中创建一个测试类来发送

rabbitmq怎样确认是否已经消费了消息_【RabbitMq 篇六】消息确认(发送确认与接收确认)...-爱代码爱编程

前言 消息确认是保证消息传递可靠性的重要步骤,上一节我们说到持久化,持久化只能保证消息不丢失,但是如果消息如果投递失败我们怎么进行补偿操作呢?解决办法就是实现回调函数进行操作,在消息的发送和消息的消费都可以进行补偿操作,下面我们就要讲解消息确认。 正文 目录 前言 正文 消息确认种类 消息发送确认 Con

rabbitmq手动确认-ack-爱代码爱编程

yml: spring: #RibbitMQ rabbitmq: host: 192.168.31.97 port: 5672 virtual-host: / username: admin password: admin #开启发送端消息抵达队列的确认、确认被路由到队列 publisher

springboot rabbitmq ACK手动确认-爱代码爱编程

一,pom文件jar包引入 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>