高级Redis之Stream的用法示例

07-08 1489阅读

不想自己搭建一个mq怎么办?Redis的Stream 来帮你,Redis Stream 是 Redis 5.0 引入的一种新的数据结构,用于处理实时的、可持久化的、基于时间序列的数据流。它非常适合处理事件流、日志、消息队列等场景。下面是一个使用 Redis Stream 的具体应用场景:简单的消息队列系统。

高级Redis之Stream的用法示例
(图片来源网络,侵删)

应用场景:实时消息队列

假设你正在构建一个实时消息通知系统,多个服务需要向某个队列写入消息,多个消费者服务需要从这个队列中读取消息执行相应操作。这个消息队列需要有高性能和高可用性,并且能够应对突发流量。

以下是如何使用 Redis Stream 实现完成订单后通知会员服务加积分这个应用场景的步骤:

步骤 1: 添加必要的依赖

在你的 pom.xml 文件中添加 Lettuce 和 Spring Data Redis 依赖:

    
    
        org.springframework.boot
        spring-boot-starter-web
    
    
    
        org.springframework.boot
        spring-boot-starter-data-redis
    
    
    
        io.lettuce.core
        lettuce-core
        6.1.5
    

步骤 2: 配置 Redis 连接

在你的 application.properties 或 application.yml 文件中配置 Redis 连接:

spring:
  redis:
    host: localhost
    port: 6379

步骤 3: 创建订单服务 (生产者)

订单服务在订单完成后将订单信息写入 Redis Stream。可以使用 Lettuce 库来与 Redis 进行交互。

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
public class OrderService {
    private static final String STREAM_KEY = "order_stream";
    private RedisClient redisClient;
    private StatefulRedisConnection connection;
    private RedisCommands commands;
    public OrderService() {
        this.redisClient = RedisClient.create("redis://localhost:6379");
        this.connection = redisClient.connect();
        this.commands = connection.sync();
    }
    public void completeOrder(String orderId, String userId, int points) {
        Map orderData = new HashMap();
        orderData.put("orderId", orderId);
        orderData.put("userId", userId);
        orderData.put("points", String.valueOf(points));
        String messageId = commands.xadd(STREAM_KEY, orderData);
        System.out.println("Order completed with messageId: " + messageId);
    }
    public void close() {
        connection.close();
        redisClient.shutdown();
    }
}

步骤 4: 创建会员服务 (消费者)

会员服务从 Redis Stream 中读取消息,并处理用户积分的增加。

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.StreamMessage;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
@Service
public class MemberService {
    private static final String STREAM_KEY = "order_stream";
    private static final String CONSUMER_GROUP = "member_group";
    private static final String CONSUMER_NAME = "member_service";
    private RedisClient redisClient;
    private StatefulRedisConnection connection;
    private RedisCommands commands;
    public MemberService() {
        this.redisClient = RedisClient.create("redis://localhost:6379");
        this.connection = redisClient.connect();
        this.commands = connection.sync();
        
        // 创建消费组
        try {
            commands.xgroupCreate(STREAM_KEY, CONSUMER_GROUP, io.lettuce.core.StreamOffset.from("0"), true);
        } catch (Exception e) {
            System.out.println("Consumer group already exists");
        }
    }
    public void consumeMessages() {
        while (true) {
            List messages = commands.xreadgroup(
                    io.lettuce.core.Consumer.from(CONSUMER_GROUP, CONSUMER_NAME),
                    io.lettuce.core.XReadArgs.StreamOffset.lastConsumed(STREAM_KEY)
            );
            for (StreamMessage message : messages) {
                Map body = message.getBody();
                String orderId = body.get("orderId");
                String userId = body.get("userId");
                int points = Integer.parseInt(body.get("points"));
                
                // 处理用户积分增加逻辑
                System.out.println("Processing order: " + orderId + " for user: " + userId + ", adding points: " + points);
                // 确认处理完成
                commands.xack(STREAM_KEY, CONSUMER_GROUP, message.getId());
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
    public void close() {
        connection.close();
        redisClient.shutdown();
    }
}

步骤 5: 调整 Spring Boot 启动类

在 Spring Boot 启动类中启动订单服务和会员服务,演示消息的生产和消费:

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class RedisStreamDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(RedisStreamDemoApplication.class, args);
    }
    @Bean
    public CommandLineRunner demo(OrderService orderService, MemberService memberService) {
        return args -> {
            // 模拟订单完成
            orderService.completeOrder("order123", "user1", 100);
            // 启动会员服务,处理消息
            new Thread(() -> memberService.consumeMessages()).start();
            // 等待一段时间,确保消息处理完成
            Thread.sleep(5000);
            orderService.close();
            memberService.close();
        };
    }
}

6. 优点

使用 Redis Stream 实现消息队列有以下几个优点:

  1. 高性能:Redis Stream 提供了高性能的读写操作,适用于高吞吐量的场景。
  2. 持久化:Redis Stream 支持数据持久化,不会因为 Redis 重启而丢失数据。
  3. 消费组:支持创建消费者组,多消费者可以协同工作,提高消费效率。
  4. 自动化管理:Redis 可以自动管理消息的 ID、时间戳等,简化开发。

7. 缺点

  • 内存占用:Redis 是内存数据库,若消息量过大,可能会占用大量内存。
  • 学习曲线:Stream API 的使用相对于其他简单数据结构较为复杂,需要一定的学习成本。

    总结

    通过上述示例,我们展示了如何使用 Redis Stream 实现一个简单的消息队列系统,包括生产者发布消息、消费者读取消息和处理以及消费组的管理。Redis Stream 的高性能、持久化和自动管理特性使其非常适合处理实时数据流、消息队列等场景。希望这个示例能够帮助你更好地理解如何使用 Redis Stream 应对实际开发中的问题。

VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]