RabbitMQ支持的消息模型

07-03 697阅读

RabbitMQ支持的消息模型

  • RabbitMQ基础
  • RabbitMQ支持的消息模型

    一、第一种模型(直连)

    我们将用Java编写两个程序,发送单个消息的生成者和接收消息并打印出来的消费者。

    在下图,“P”是生成者,“C”消费者。中间框是一个队列RabbitMQ保留的消息缓冲区 。

    RabbitMQ支持的消息模型

    首先构建一个Maven项目,然后引入依赖。

    
      com.rabbitmq
      amqp-client
      5.10.0
    
    
    定义生产者
    import com.duan.rabbitmq.utils.RabbitMqUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    /**
    * @author db
    * @version 1.0
    * @description Provider 生产者代码
    * @since 2022/12/29
    */
    public class Producer {
        public static void main(String[] args) throws IOException, TimeoutException {
            //        // 1.创建连接工厂
            //        ConnectionFactory connectionFactory = new ConnectionFactory();
            //        // 2.设置连接属性
            //        connectionFactory.setHost("192.168.137.120");
            //        connectionFactory.setPort(5672);
            //        connectionFactory.setVirtualHost("/");
            //        connectionFactory.setUsername("admin");
            //        connectionFactory.setPassword("123456");
            //        connectionFactory.setHandshakeTimeout(60000);
            //
            //        // 3.从连接工厂获得连接
            //        Connection connection = connectionFactory.newConnection();
            // 从工具类中获得连接
            Connection connection = RabbitMqUtil.getConnection();
            // 4.从连接中获得channel
            Channel channel = connection.createChannel();
            // 5.声明队列queue存储消息
            /**
             * 参数s:队列名称 如果队列不存在就自动创建
             * 参数b:用来定义队列特性是否要持久化 true 持久化队列  false 不持久化
             * 参数b1: exclusive 是否独占队列  true 独占队列 false 不独占
             * 参数b2:autoDelete 是否在消费完成后自动删除队列 true 自动删除 false 不自动删除
             * 参数5:额外附加参数
             *
             */
            channel.queueDeclare("hello",true,false,false,null);
            // 7.发送消息给中间件
            // 参数1:交换机名称 参数2:队列名称 参数3:传递消息的额外设置 参数4:
            channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
            System.out.println("消息发送成功");
            //        // 8.关闭连接
            //        channel.close();
            //        connection.close();
            RabbitMqUtil.closeConnectionAndChannel(channel,connection);
        }
    }
    

    执行发送,这个时候可以在web控制台查看到这个队列queue的信息。

    定义消费者
    import com.duan.rabbitmq.utils.RabbitMqUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    /**
     * @author db
     * @version 1.0
     * @description Consumer  消费者
     * @since 2022/12/29
     */
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
    //        ConnectionFactory connectionFactory = new ConnectionFactory();
    //        connectionFactory.setHost("192.168.137.120");
    //        connectionFactory.setPort(5672);
    //        connectionFactory.setVirtualHost("/");
    //        connectionFactory.setUsername("admin");
    //        connectionFactory.setPassword("123456");
    //        connectionFactory.setHandshakeTimeout(60000);
    //
    //        // 创建连接
    //        Connection connection = connectionFactory.newConnection();
            // 从工具类中获得连接
            Connection connection = RabbitMqUtil.getConnection();
            // 创建信道
            Channel channel = connection.createChannel();
            // 消费者成功消费时的回调
            DeliverCallback deliverCallback = (consumerTag,message) ->{
                System.out.println(new String(message.getBody()));
            };
            // 消费者取消消费时的回调
            CancelCallback callback = consumerTag ->{
                System.out.println("消费者取消消费接口的回调");
            };
            // 参数1:消费队列的名称
            // 参数2:消息的自动确认机制(已获得消息就通知MQ消息已被消费)true 打开 false 关闭
            // 参数3:
            channel.basicConsume("hello",true,deliverCallback,callback);
    //        channel.close();
    //        connection.close();
        }
    }
    

    工具类的包装

    package com.duan.rabbitmq.utils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    /**
     * @author db
     * @version 1.0
     * @description RabbitMqUtil
     * @since 2023/1/2
     */
    public class RabbitMqUtil {
        // 定义提供连接对象的方法
        public static Connection getConnection(){
            try{
                // 1.创建连接工厂
                ConnectionFactory connectionFactory = new ConnectionFactory();
                // 2.设置连接属性
                connectionFactory.setHost("192.168.137.120");
                connectionFactory.setPort(5672);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setUsername("admin");
                connectionFactory.setPassword("123456");
                connectionFactory.setHandshakeTimeout(60000);
                return connectionFactory.newConnection();
            }catch (Exception e){
                e.printStackTrace();
            }
            return null;
        }
        // 关闭连接通道和关闭连接的工具方法
        public static void closeConnectionAndChannel(Channel channel,Connection connection){
            try{
                if(channel !=null){
                    channel.close();
                }
                if(connection != null){
                    connection.close();
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    

    报连接超时错误

    RabbitMQ支持的消息模型

    **解决方案:**原因是连接超时,加超时时间。

    maevn项目设置超时时间:factory.setHandshakeTimeout\(60000\)

    二、第二种模型(work quene)

    work queues被称为任务队列(Task queues)。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型: 让多个消费者绑定到一个队列,共同消费队列中的消息。 队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

    RabbitMQ支持的消息模型

    • P:生产者
    • C1:消费者1
    • C2:消费者2
      定义生成者
      package com.duan.rabbitmq.work;
      import com.duan.rabbitmq.utils.RabbitMqUtil;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import java.io.IOException;
      /**
       * @author db
       * @version 1.0
       * @description Producer
       * @since 2023/3/24
       */
      public class Producer {
          public static void main(String[] args) throws IOException {
              Connection connection = RabbitMqUtil.getConnection();
              Channel channel = connection.createChannel();
              /**
               * 声明队列queue存储信息
               * 参数1: 队列名称
               * 参数2: 用来定义队列是否要持久化
               * 参数3: exclusion 是否是独占队列
               * 参数4: autoDelete 是否再消费完成后自动删除队列
               * 参数5: 额外附加参数
               */
              channel.queueDeclare("work",true,false,false,null);
              for(int i = 0; i
                  // 参数1:交换机名称 参数2:队列名称 参数3:消息传递的额外设置
                  channel.basicPublish("","work",null,(i+"work").getBytes());
              }
              RabbitMqUtil.closeConnectionAndChannel(channel,connection);
          }
      }
      
          public static void main(String[] args) throws IOException {
              Connection connection = RabbitMqUtil.getConnection();
              // 创建信道
              Channel channel = connection.createChannel();
              // 消费者消费成功时的回调
              channel.queueDeclare("work",true,false,false,null);
              channel.basicConsume("work",true,new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
                      System.out.println("消费者1: "+new String(body));
                  }
              });
              RabbitMqUtil.closeConnectionAndChannel(channel,connection);
          }
      }
      
          public static void main(String[] args) throws IOException {
              Connection connection = RabbitMqUtil.getConnection();
              // 创建信道
              Channel channel = connection.createChannel();
              // 消费者消费成功时的回调
              channel.queueDeclare("work",true,false,false,null);
              channel.basicConsume("work",true,new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
                      try{
                          Thread.sleep(2000);
                      }catch (Exception e){
                          e.printStackTrace();
                      }
                      System.out.println("消费者2: "+new String(body));
                  }
              });
              RabbitMqUtil.closeConnectionAndChannel(channel,connection);
          }
      }
      
          public static void main(String[] args) throws IOException {
              Connection connection = RabbitMqUtil.getConnection();
              Channel channel = connection.createChannel();
              /**
               * 声明队列queue存储信息
               * 参数1: 队列名称
               * 参数2: 用来定义队列是否要持久化
               * 参数3: exclusion 是否是独占队列
               * 参数4: autoDelete 是否再消费完成后自动删除队列
               * 参数5: 额外附加参数
               */
              channel.queueDeclare("work",true,false,false,null);
              for(int i = 0; i
                  // 参数1:交换机名称 参数2:队列名称 参数3:消息传递的额外设置
                  channel.basicPublish("","work",null,(i+"work").getBytes());
              }
              RabbitMqUtil.closeConnectionAndChannel(channel,connection);
          }
      }
      
          public static void main(String[] args) throws IOException {
              Connection connection = RabbitMqUtil.getConnection();
              // 创建信道
              Channel channel = connection.createChannel();
              channel.basicQos(1); // 每次只消费一个消息
              // 消费者消费成功时的回调
              channel.queueDeclare("work",true,false,false,null);
              channel.basicConsume("work",false,new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
                      System.out.println("消费者1: "+new String(body));
                      // 手动确认,参数1:消息标识  参数2:每次确认1个
                      channel.basicAck(envelope.getDeliveryTag(),false);
                  }
              });
          }
      }
      
          public static void main(String[] args) throws IOException {
              Connection connection = RabbitMqUtil.getConnection();
              // 创建信道
              Channel channel = connection.createChannel();
              channel.basicQos(1);  // 每次消费一个消息
              // 消费者消费成功时的回调
              channel.queueDeclare("work",true,false,false,null);
              channel.basicConsume("work",false,new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
                      try{
                          Thread.sleep(2000);
                      }catch (Exception e){
                          e.printStackTrace();
                      }
                      System.out.println("消费者2: "+new String(body));
                      channel.basicAck(envelope.getDeliveryTag(),false);
                  }
              });
          }
      }
      
          public static void main(String[] args) throws IOException {
              // 获取连接对象
              Connection connection = RabbitMqUtil.getConnection();
              Channel channel = connection.createChannel();
              // 将通道声明交换机  参数1:交换机名称 参数2:交换机类型
              channel.exchangeDeclare("logs","fanout");
              // 发送消息
              channel.basicPublish("logs","",null,"fanout type message".getBytes());
              // 释放资源
              RabbitMqUtil.closeConnectionAndChannel(channel,connection);
          }
      }
      
          public static void main(String[] args) throws IOException {
              // 获得连接对象
              Connection connection = RabbitMqUtil.getConnection();
              Channel channel = connection.createChannel();
              // 通道绑定交换机
              channel.exchangeDeclare("logs","fanout");
              // 绑定临时队列
              String queue = channel.queueDeclare().getQueue();
              // 绑定交换机和队列
              channel.queueBind(queue,"logs","");
              // 消费消息
              channel.basicConsume(queue,true,new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.out.println("消费者1: "+new String(body));
                  }
              });
          }
      }
      
          public static void main(String[] args) throws IOException {
              // 获得连接对象
              Connection connection = RabbitMqUtil.getConnection();
              Channel channel = connection.createChannel();
              // 通道绑定交换机
              channel.exchangeDeclare("logs","fanout");
              // 绑定临时队列
              String queue = channel.queueDeclare().getQueue();
              // 绑定交换机和队列
              channel.queueBind(queue,"logs","");
              // 消费消息
              channel.basicConsume(queue,true,new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.out.println("消费者2: "+new String(body));
                  }
              });
          }
      }
      
          public static void main(String[] args) throws IOException {
              // 获得连接对象
              Connection connection = RabbitMqUtil.getConnection();
              Channel channel = connection.createChannel();
              // 通道绑定交换机
              channel.exchangeDeclare("logs","fanout");
              // 绑定临时队列
              String queue = channel.queueDeclare().getQueue();
              // 绑定交换机和队列
              channel.queueBind(queue,"logs","");
              // 消费消息
              channel.basicConsume(queue,true,new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.out.println("消费者3: "+new String(body));
                  }
              });
          }
      }
      
          public static void main(String[] args) throws IOException {
              // 建立连接
              Connection connection = RabbitMqUtil.getConnection();
              Channel channel = connection.createChannel();
              channel.exchangeDeclare("logs_direct","direct");
              String routingKey = "error";
              channel.basicPublish("logs_direct",routingKey,null,("这是direct模型发布的基于route key: ["+routingKey+"] 发送的消息").getBytes());
              RabbitMqUtil.closeConnectionAndChannel(channel,connection);
          }
      }
      
          public static void main(String[] args) throws IOException {
              // 获得连接对象
              Connection connection = RabbitMqUtil.getConnection();
              Channel channel = connection.createChannel();
              // 通道绑定交换机
              channel.exchangeDeclare("logs_direct","direct");
              // 绑定临时队列
              String queue = channel.queueDeclare().getQueue();
              // 绑定交换机和队列
              channel.queueBind(queue,"logs_direct","error");
              // 消费消息
              channel.basicConsume(queue,true,new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.out.println("消费者1: "+new String(body));
                  }
              });
          }
      }
      
          public static void main(String[] args) throws IOException {
              // 获得连接对象
              Connection connection = RabbitMqUtil.getConnection();
              Channel channel = connection.createChannel();
              // 通道绑定交换机
              channel.exchangeDeclare("logs_direct","direct");
              // 绑定临时队列
              String queue = channel.queueDeclare().getQueue();
              // 绑定交换机和队列
              channel.queueBind(queue,"logs_direct","info");
              channel.queueBind(queue,"logs_direct","error");
              channel.queueBind(queue,"logs_direct","warning");
              // 消费消息
              channel.basicConsume(queue,true,new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.out.println("消费者1: "+new String(body));
                  }
              });
          }
      }
      
          public static void main(String[] args) throws IOException {
              //获取连接对象
              Connection connection = RabbitMqUtil.getConnection();
              Channel channel = connection.createChannel();
              //声明交换机以及交换机类型 topic
              channel.exchangeDeclare("topics","topic");
              //发布消息
              String routekey = "save.user.delete";
              channel.basicPublish("topics",routekey,null,("这里是topic动态路由模型,routekey: ["+routekey+"]").getBytes());
              //关闭资源
              RabbitMqUtil.closeConnectionAndChannel(channel,connection);
          }
      }
      
          public static void main(String[] args) throws IOException {
              // 获得连接对象
              Connection connection = RabbitMqUtil.getConnection();
              Channel channel = connection.createChannel();
              // 通道绑定交换机
              channel.exchangeDeclare("topics","topic");
              // 绑定临时队列
              String queue = channel.queueDeclare().getQueue();
              // 绑定交换机和队列
              channel.queueBind(queue,"topics","*.user.*");
              // 消费消息
              channel.basicConsume(queue,true,new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.out.println("消费者1: "+new String(body));
                  }
              });
          }
      }
      
          public static void main(String[] args) throws IOException {
              // 获得连接对象
              Connection connection = RabbitMqUtil.getConnection();
              Channel channel = connection.createChannel();
              // 通道绑定交换机
              channel.exchangeDeclare("topics","topic");
              // 绑定临时队列
              String queue = channel.queueDeclare().getQueue();
              // 绑定交换机和队列
              channel.queueBind(queue,"topics","*.user.#");
              // 消费消息
              channel.basicConsume(queue,true,new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.out.println("消费者1: "+new String(body));
                  }
              });
          }
      }
      
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]