rabbitmq-死信队列-普通队列满了添加消息会怎样?-爱代码爱编程
1:死信队列架构图
2:业务描述:
发10条消息,到固定长度为6的普通队列,普通队列有哪些消息?死信队列有哪些消息?
3:结论
普通队列满了之后继续添加消息,前面的消息会挤出到死信队列中。如: 普通队列长度为6,添加10个进去后,先进去的4个会到死信队列中,最后6个消息还在普通队列中。
发送的消息 | 普通队列的消息 | 死信队列的消息 |
9876543210 | 987654 | 3210 |
4:代码实现
1、工具类代码
public class RabbitmqUtils {
private static Logger logger = LoggerFactory.getLogger(RabbitmqUtils.class);
// 死信相关队列
public static String NORMAL_EXCHANGE_NAME = "normal_exchange";
public static String NORMAL_EXCHANGE_ROUTING_KEY = "normal_routing_key";
public static String NORMAL_EXCHANGE_QUEUE_NAME = "normal_queue";
public static String DEAD_EXCHANGE_NAME = "dead_exchange";
public static String DEAD_EXCHANGE_QUEUE_NAME = "dead_queue";
public static String DEAD_EXCHANGE_ROUTING_KEY = "dead_routing_key";
// 队列持久化
public static boolean DURABLE = true;
public static Connection connection;
public static Channel channel;
static {
logger.info("getChannel begin...");
// 创建链接工厂
ConnectionFactory factory = new ConnectionFactory();
logger.info("getChannel factory:{}", factory.toString());
factory.setHost("192.168.6.8");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123");
// 创建链接
try {
connection = factory.newConnection();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
logger.info("getChannel connection:{}", connection.toString());
try {
channel = connection.createChannel();
} catch (IOException e) {
throw new RuntimeException(e);
}
logger.info("getChannel channel:{}", channel.toString());
logger.info("getChannel success!");
}
/**
* 死信队列实战
*
* @return
* @throws IOException
*/
public static Channel createDeadExchangeAndQueue() throws IOException {
// 删除已存在的交换机
channel.exchangeDelete(RabbitmqUtils.NORMAL_EXCHANGE_NAME);
channel.exchangeDelete(RabbitmqUtils.DEAD_EXCHANGE_NAME);
// 删除已存在的队列
channel.queueDelete(RabbitmqUtils.NORMAL_EXCHANGE_QUEUE_NAME);
channel.queueDelete(RabbitmqUtils.DEAD_EXCHANGE_QUEUE_NAME);
// 创建普通交换机
channel.exchangeDeclare(
RabbitmqUtils.NORMAL_EXCHANGE_NAME,
BuiltinExchangeType.DIRECT,
RabbitmqUtils.DURABLE);
// 正常队列绑定死信队列信息并设置队列的长度
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange",RabbitmqUtils.DEAD_EXCHANGE_NAME);
arguments.put("x-dead-letter-routing-key",RabbitmqUtils.DEAD_EXCHANGE_ROUTING_KEY);
arguments.put("x-max-length",6);
// 创建普通队列
channel.queueDeclare(
RabbitmqUtils.NORMAL_EXCHANGE_QUEUE_NAME,
RabbitmqUtils.DURABLE,
false,false,arguments);
// 将不同队列绑定到普通交换机上
channel.queueBind(
RabbitmqUtils.NORMAL_EXCHANGE_QUEUE_NAME,
RabbitmqUtils.NORMAL_EXCHANGE_NAME,
RabbitmqUtils.NORMAL_EXCHANGE_ROUTING_KEY);
// 创建死信交换机
channel.exchangeDeclare(
RabbitmqUtils.DEAD_EXCHANGE_NAME,
BuiltinExchangeType.DIRECT,
RabbitmqUtils.DURABLE);
// 创建死信队列
channel.queueDeclare(
RabbitmqUtils.DEAD_EXCHANGE_QUEUE_NAME,
RabbitmqUtils.DURABLE,
false,false,null);
// 将死信队列绑定到交换机上
channel.queueBind(
RabbitmqUtils.DEAD_EXCHANGE_QUEUE_NAME,
RabbitmqUtils.DEAD_EXCHANGE_NAME,
RabbitmqUtils.DEAD_EXCHANGE_ROUTING_KEY);
return channel;
}
}
2、生产者代码
public class DeadProducer {
private static Logger logger = LoggerFactory.getLogger(DeadProducer.class);
public static void main(String[] args) throws IOException {
Channel channel = RabbitmqUtils.createDeadExchangeAndQueue();
// 发送消息
for (int i=0; i< 10; i++) {
String message = "info"+i;
channel.basicPublish(
RabbitmqUtils.NORMAL_EXCHANGE_NAME,
RabbitmqUtils.NORMAL_EXCHANGE_ROUTING_KEY,
null,
message.getBytes());
logger.info("发送的消息内容为:{}",message);
}
}
}
生产者输出日志:
发送的消息内容为:info0
发送的消息内容为:info1
发送的消息内容为:info2
发送的消息内容为:info3
发送的消息内容为:info4
发送的消息内容为:info5
发送的消息内容为:info6
发送的消息内容为:info7
发送的消息内容为:info8
发送的消息内容为:info9
生产者发送消息后的结果
3、普通消费者1代码
public class OrdinaryConsumer1 {
private static Logger logger = LoggerFactory.getLogger(OrdinaryConsumer1.class);
public static void main(String[] args) throws IOException {
Channel channel = RabbitmqUtils.channel;
DeliverCallback deliverCallback = (consumerTag, message) ->{
String msg = new String(message.getBody(), "UTF-8");
logger.info("普通队列接受到的消息是:{}", msg);
};
CancelCallback cancelCallback = (consumerTag)->{
logger.info(consumerTag + "消息被中断");
};
channel.basicConsume(
RabbitmqUtils.NORMAL_EXCHANGE_QUEUE_NAME,
true,
deliverCallback,
cancelCallback);
}
}
普通消费者消费的日志:
普通队列接受到的消息是:info4
普通队列接受到的消息是:info5
普通队列接受到的消息是:info6
普通队列接受到的消息是:info7
普通队列接受到的消息是:info8
普通队列接受到的消息是:info9
4、死信消费者2代码
public class DeadConsumer2 {
private static Logger logger = LoggerFactory.getLogger(DeadConsumer2.class);
public static void main(String[] args) throws IOException {
Channel channel = RabbitmqUtils.channel;
DeliverCallback deliverCallback = (consumerTag, message) ->{
String msg = new String(message.getBody(), "UTF-8");
logger.info("死信队列接受到的消息是:{}", msg);
};
CancelCallback cancelCallback = (consumerTag)->{
logger.info(consumerTag + "消息被中断");
};
channel.basicConsume(
RabbitmqUtils.DEAD_EXCHANGE_QUEUE_NAME,
true,
deliverCallback,
cancelCallback);
}
}
死信消费者消费的日志:
死信队列接受到的消息是:info0
死信队列接受到的消息是:info1
死信队列接受到的消息是:info2
死信队列接受到的消息是:info3