Skip to content
  1. (手动应答)消费者没有对生产者的发送的消息进行手动确认,则 消息会重新入队,直到被消费者重新消费才进行删除

  2. 前台操作 https://blog.csdn.net/Ying_ph/article/details/132607143

  3. net start Rabbitmq net stop Rabbitmq

  4. 对于非持久化队列 必须先删除队列 才能进行重建持久化队列 否则会报错

    1. ``Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'test-persist' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
  5. rabbitmq 默认消息分发 是轮训的 ,,但是可以使用 不公平分发 能者多劳

shell
channel.basicQos(1);


rabbitmq  通过 发布/确认机制 ,消息持久化,队列持久化 实现 消息不丢失的目的

发布确认 
逐个确认
批量确认
异步批量确认
编程逻辑复杂   但是性价比高 可靠 效率高

一个队列里的消息只能被消费一次

image.pngimage.png

如何实现 一个消息被消费多次(通过 交换机 进行路由绑定) image.png 几种不同类型的交换机

  1. fanout 扇出 发布订阅 (1个发布者 多个订阅者 广播类型 )
java
/*
*
*   发布 订阅  1 -》 多
* */
public class Producer03 {

    private static String EXCHANGE_NAME="logs";

    public static void main(String[] args) throws Exception {
        Channel channels =null;
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        try {
            Connection connection = connectionFactory.newConnection();
            Optional<Channel> channel = connection.openChannel();
            channels =  channel.get();
        }catch (Exception e){
            e.printStackTrace();
        }
        channels.exchangeDeclare(EXCHANGE_NAME,"fanout");
        String data ="";
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            data = scanner.next();
            channels.basicPublish(EXCHANGE_NAME,"",null,data.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息发送成功");

        }
    }



}
---  订阅者  可以有多个  都可以收到消息
    /**
*   rabbitmq  消费者
* @author jd
*
*/
    public class Consumer04 {
        private static String EXCHANGE_NAME="logs";

        /**
* @param args
*/
        public static void main(String[] args) throws Exception{
            Channel channels =null;
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("localhost");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            try {
                Connection connection = connectionFactory.newConnection();
                Optional<Channel> channel = connection.openChannel();
                channels =  channel.get();
            }catch (Exception e){
                e.printStackTrace();
            }

            /* 扇出交换机*/
            channels.exchangeDeclare(EXCHANGE_NAME,"fanout");
            /* 临时队列  队列名称随机
*  消费者 断开连接 自动删除
* */
            channels.queueDeclare("ATP",false,false,false,null);
            /*  String queue, String exchange, String routingKey */
            channels.queueBind("ATP",EXCHANGE_NAME,"");
            System.out.println("queue:"+"ATP"+":Consumer04等待接受消息:");
            DeliverCallback callback = (consumerTag, message)->{
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("msg:"+new String(message.getBody()));
            };
            CancelCallback cancelCallback = (consumerTag) ->{

            };
            channels.basicConsume("ATP",true,callback,cancelCallback);

        }
  1. direct

  2. topic

image.png

死信消息 image.pngimage.png