MQ四兄弟:如何保证消息顺序性

07-10 1161阅读

在当今的分布式系统架构中,消息队列(MQ)是不可或缺的组成部分。它们在确保系统组件之间高效通信方面发挥着关键作用。特别是在金融交易、物流跟踪等对消息处理顺序有严格要求的场景中,消息队列的顺序性保证显得更为重要。接下来,我们将深入探讨RabbitMQ、RocketMQ、Kafka和Pulsar这四个广泛使用的消息队列系统,分析它们是如何确保消息的顺序性,并附上相应的代码示例。

MQ四兄弟:如何保证消息顺序性

RabbitMQ

RabbitMQ作为一款成熟的开源消息队列,,基于AMQP(Advanced Message Queuing Protocol)协议构建,广泛应用于企业级应用中。虽然RabbitMQ本身并不保证严格的全局顺序性,但可以通过特定的设计模式来实现消息顺序性。

  1. 单一队列和单一消费者模式:确保一个队列只被一个消费者消费,这样可以保证消息按照发送的顺序被处理。因为队列本身就是一个先进先出的结构。
  2. 消息排序:在消息生产者端,为消息添加序列号或时间戳,消费者端根据这些信息对消息进行排序。

以下是一个简单的Java代码片段,展示了如何在RabbitMQ中发送消息。请注意,这个例子没有包含消息排序的逻辑,因为它依赖于具体的业务场景和消息结构。

public class Send {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

创建了一个连接和一个通道,然后声明了一个队列。之后,我们发布了一个简单的消息到队列中。为了保证消息的顺序性,我们需要确保所有消息都是通过同一个通道发送,并且在消费端也是由同一个消费者按顺序接收处理。

RocketMQ

RocketMQ作为阿里巴巴开源的分布式消息队列,在保证消息顺序性方面提供了一种基于MessageQueueSelector的解决方案。其核心思路是将有序的消息写入特定的队列,从而使消费端固定消费某个队列时,就能够按顺序消费消息。

具体来说,RocketMQ中有两个重要概念:

  • Topic: 逻辑上的消息主题
  • MessageQueue: 物理上存储消息的队列

    一个Topic包含多个MessageQueue,消息会根据其内容进行哈希计算,分配到不同的MessageQueue中。用户可以通过提供MessageQueueSelector,对特定类型的消息强制分配到同一个MessageQueue,从而保证顺序性。

    示例代码:

    生产者

    // 实例化消息生产者Producer
    DefaultMQProducer producer = new DefaultMQProducer("unique_group_name");
    // 设置NameServer的地址
    producer.setNamesrvAddr("nameserver:9876");
    // 启动Producer实例
    producer.start();
    // 创建消息,并指定Topic,Tag和消息体
    Message msg = new Message("TopicTest", "TagA", "OrderID" + orderId, ("Hello RocketMQ " + i).getBytes());
    // 发送有序消息
    producer.send(msg, new MessageQueueSelector() {
        @Override
        public MessageQueue select(List mqs, Message msg, Object arg) {
            Integer orderId = (Integer) arg; // 订单ID作为选择器的参数
            int index = orderId % mqs.size(); // 根据订单ID计算MessageQueue索引
            return mqs.get(index); // 返回该索引对应的MessageQueue
        }
    }, orderId);

    通过上述代码,发送端可以将具有相同订单号的消息发送到同一个MessageQueue。

    消费端

     DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("unique_group_name");
        consumer.setNamesrvAddr("nameserver:9876");
        consumer.subscribe("TopicTest", "TagA");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    System.out.printf("Consumer: %s %n", new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

    消费端只需固定消费指定的MessageQueue,即可以保证消息按顺序被消费。

    Kafka

    Kafka通过Partition(分区)的概念来保证消息的顺序性。同一个Partition中的消息是有序的,但不同Partition之间是无序的。Producer在发送消息时可以指定消息要发送到的分区。Kafka默认提供了基于key的分区策略,确保具有相同key的消息会被发送到同一个分区,从而保证这些消息在这个分区内的顺序性。

    以下是一个简单的 Java 代码示例,展示了如何在 Kafka 中发送和消费有序消息:

    生产者代码

    public class OrderProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            KafkaProducer producer = new KafkaProducer(props);
            // 假设我们有10个订单,每个订单的消息需要顺序处理
            for (int orderId = 0; orderId  
    

    producer.send(new ProducerRecord("OrderTopic", Integer.toString(orderId), message));

    第二个参数是消息的键(key),这里使用订单ID作为键,确保相同订单ID的消息发送到同一个分区

    消费者代码

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "order_consumer_group");
    KafkaConsumer consumer = new KafkaConsumer(props);
    consumer.subscribe(Collections.singletonList("OrderTopic"));
    

    在生产者代码中,我们使用了相同的 key(即订单ID)来确保消息被发送到同一个 Partition。在消费者代码中,我们订阅了整个 Topic,但由于我们使用了相同的 key 来发送消息,Kafka 会自动将具有相同 key 的消息路由到同一个 Partition,从而保证顺序性。

    Pulsar

    Apache Pulsar 通过分区主题(Partitioned Topics)来保证消息的顺序性。在Pulsar中,每个分区可以看作是一个独立的消息队列,分区内的消息保持发送顺序。为了确保消息的顺序性,生产者在发送消息时需要指定一个键(Key),Pulsar会根据这个键将消息路由到特定的分区。这样,具有相同键的消息就会被发送到同一个分区,并且按照发送的顺序进行消费。

    生产者代码示例:

    public class PulsarOrderProducer {
        public static void main(String[] args) throws Exception {
            PulsarClient client = PulsarClient.builder()
                    .serviceUrl("pulsar://localhost:6650")
                    .build();
            Producer producer = client.newProducer(Schema.STRING)
                    .topic("persistent://public/default/my-topic")
                    .create();
            for (int i = 0; i  
    

    消费者代码示例:

    public class PulsarOrderConsumer {
        public static void main(String[] args) throws Exception {
            PulsarClient client = PulsarClient.builder()
                    .serviceUrl("pulsar://localhost:6650")
                    .build();
            Consumer consumer = client.newConsumer(Schema.STRING)
                    .topic("persistent://public/default/my-topic")
                    .subscriptionName("my-subscription")
                    .subscriptionType(SubscriptionType.Exclusive)
                    .subscribe();
            while (true) {
                Message msg = consumer.receive();
                try {
                    // 处理消息
                    System.out.printf("Message with key %s: %s", msg.getKey(), msg.getValue());
                    consumer.acknowledge(msg);
                } catch (Exception e) {
                    consumer.negativeAcknowledge(msg);
                }
            }
        }
    }

    在消费者代码中,我们使用了SubscriptionType.Exclusive,使订阅被独占,确保只有一个消费者能够消费分区内的消息,从而保证了消息的顺序性。

    总结

    MQ四兄弟:如何保证消息顺序性

    尽管RabbitMQ、RocketMQ、Kafka和Pulsar这些消息队列系统虽然在实现细节上有所不同,但它们保证消息顺序性的核心思想都是相似的,即确保具有相同特征的消息被发送到同一队列或分区中,由于队列数据结构本身就是先进先出的结构,因此只需要消费者从该队列按顺序消费,就能够保证消息的有序性。

VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]