(手动应答)消费者没有对生产者的发送的消息进行手动确认,则 消息会重新入队,直到被消费者重新消费才进行删除
前台操作 https://blog.csdn.net/Ying_ph/article/details/132607143
net start Rabbitmq net stop Rabbitmq
对于非持久化队列 必须先删除队列 才能进行重建持久化队列 否则会报错
- ``Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'test-persist' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
rabbitmq 默认消息分发 是轮训的 ,,但是可以使用 不公平分发 能者多劳
shell
channel.basicQos(1);
rabbitmq 通过 发布/确认机制 ,消息持久化,队列持久化 实现 消息不丢失的目的
发布确认
逐个确认
批量确认
异步批量确认
编程逻辑复杂 但是性价比高 可靠 效率高
一个队列里的消息只能被消费一次
如何实现 一个消息被消费多次(通过 交换机 进行路由绑定) 几种不同类型的交换机
- fanout 扇出 发布订阅 (1个发布者 多个订阅者 广播类型 )
java
/*
*
* 发布 订阅 1 -》 多
* */
public class Producer03 {
private static String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception {
Channel channels =null;
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
try {
Connection connection = connectionFactory.newConnection();
Optional<Channel> channel = connection.openChannel();
channels = channel.get();
}catch (Exception e){
e.printStackTrace();
}
channels.exchangeDeclare(EXCHANGE_NAME,"fanout");
String data ="";
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
data = scanner.next();
channels.basicPublish(EXCHANGE_NAME,"",null,data.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送成功");
}
}
}
--- 订阅者 可以有多个 都可以收到消息
/**
* rabbitmq 消费者
* @author jd
*
*/
public class Consumer04 {
private static String EXCHANGE_NAME="logs";
/**
* @param args
*/
public static void main(String[] args) throws Exception{
Channel channels =null;
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
try {
Connection connection = connectionFactory.newConnection();
Optional<Channel> channel = connection.openChannel();
channels = channel.get();
}catch (Exception e){
e.printStackTrace();
}
/* 扇出交换机*/
channels.exchangeDeclare(EXCHANGE_NAME,"fanout");
/* 临时队列 队列名称随机
* 消费者 断开连接 自动删除
* */
channels.queueDeclare("ATP",false,false,false,null);
/* String queue, String exchange, String routingKey */
channels.queueBind("ATP",EXCHANGE_NAME,"");
System.out.println("queue:"+"ATP"+":Consumer04等待接受消息:");
DeliverCallback callback = (consumerTag, message)->{
System.out.println("consumerTag:"+consumerTag);
System.out.println("msg:"+new String(message.getBody()));
};
CancelCallback cancelCallback = (consumerTag) ->{
};
channels.basicConsume("ATP",true,callback,cancelCallback);
}
direct
topic
死信消息