• 设为首页
  • 收藏本站
  • 积分充值
  • VIP赞助
  • 手机版
  • 微博
  • 微信
    微信公众号 添加方式:
    1:搜索微信号(888888
    2:扫描左侧二维码
  • 快捷导航
    福建二哥 门户 查看主题

    解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)

    发布者: 娅水9213 | 发布时间: 2025-6-19 12:43| 查看数: 44| 评论数: 0|帖子模式

    Redis秒杀优化方案(阻塞队列+Stream流的消息队列)

    下面是我们的秒杀流程:

    对于正常的秒杀处理,我们需要多次查询数据库,会给数据库造成相当大的压力,这个时候我们需要加入缓存,进而缓解数据库压力。
    在上面的图示中,我们可以将一条流水线的任务拆成两条流水线来做,如果我们直接将判断秒杀库存与校验一人一单放在流水线A上,剩下的放在另一条流水线B,那么如果流水线A就可以相当于服务员直接判断是否符合资格,如果符合资格那么直接生成信息给另一条流水线B去处理业务,这里的流水线就是咱们的线程,而流水线A也是基于数据库进行查询,也会压力数据库,那么这种情况我们就可以将待查询信息保存在Redis缓存中。
    但是我们不能再流水线A判断完成后去直接调用流水线B,这样的效率是大打折扣的,这种情况我们需要开启独立线程去执行流水线B的操作,如何知道给哪个用户创建订单呢?这个时候就要流水线A在判断成功后去生成信息给独立线程。
    最后的业务就变成,用户直接访问流水线A,通过流水线A去判断,如果通过则生成信息给流水线B去创建订单,过程如下图:

    那么什么样的数据结构满足下面条件:

    • ① 一个key能够保存很多值
    • ②唯一性:一人一单需要保证用户id不能重复。
    所以我们需要使用set:

    那么如何判断校验用户的购买资格呢?

    而上述判断需要保证原子性,所以我们需要使用Lua脚本进行编写:
    1. local voucherId = ARGV[1]; -- 优惠劵id
    2. local userId = ARGV[2]; -- 用户id

    3. -- 库存key
    4. local stockKey = 'seckill:stock' .. voucherId; -- 拼接
    5. -- 订单key
    6. local stockKey = 'seckill:stock' .. voucherId; -- 拼接
    7. -- 判断库存是否充足
    8. if(tonumber(redis.call('get',stockKey) <= 0)) then
    9.     -- 库存不足,返回1
    10.     return 1;
    11. end;
    12. -- 判断用户是否下单
    13. if(redis.call('sismember',orderKey,userId)) then
    14.     -- 存在,说明重复下单,返回2
    15.     return 2;
    16. end
    17. -- 扣减库存 incrby stockKey -1
    18. redis.call('incrby',stockKey,-1);
    19. -- 下单(保存用户) sadd orderKey userId
    20. redis.call('sadd',orderKey,userId);
    21. return 0;
    复制代码
    之后我们按照下面步骤来实现代码:

    在方法体内执行Lua脚本来原子性判断,然后判断是否能够处理并传入阻塞队列:
    1. @Slf4j
    2. @Service
    3. public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    4.     @Autowired
    5.     private ISeckillVoucherService seckillVoucherService;
    6.     @Autowired
    7.     private RedisIdWorker redisIdWorker;
    8.     @Resource
    9.     private StringRedisTemplate stringRedisTemplate;
    10.     @Resource
    11.     private RedissonClient redissonClient;
    12.     private static final DefaultRedisScript<Long> SECKILL_SCRIPT; // 泛型内填入返回值类型
    13.     static { // 静态属性要使用静态代码块进行初始化
    14.         SECKILL_SCRIPT = new DefaultRedisScript<>();
    15.         SECKILL_SCRIPT.setResultType(Long.class);
    16.         SECKILL_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
    17.     }
    18.     public Result seckillVoucherMax(Long voucherId) {
    19.         // 获取用户信息
    20.         Long userId = UserHolder.getUser().getId();
    21.         // 1.执行Lua脚本来判断用户资格
    22.         Long result = stringRedisTemplate.execute(
    23.                             SECKILL_SCRIPT,
    24.                             Collections.emptyList(), // Lua无需接受key
    25.                             voucherId.toString(),
    26.                             userId.toString()
    27.                         );
    28.         // 2.判断结果是否为0
    29.         int r = result.intValue();
    30.         if(r != 0) {
    31.             // 不为0代表无资格购买
    32.             return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
    33.         }
    34.         // 3.有购买资格则将下单信息保存到阻塞队列中
    35.         // ...
    36.         return Result.ok();
    37.     }

    38. }
    复制代码
    接下来我们创建阻塞队列,线程池以及线程方法,随后使用Springboot提供的注解在@PostConstruct去给线程池传入线程方法:
    1. @Slf4j
    2. @Service
    3. public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    4.     @Autowired
    5.     private ISeckillVoucherService seckillVoucherService;
    6.     @Autowired
    7.     private RedisIdWorker redisIdWorker;
    8.     @Resource
    9.     private StringRedisTemplate stringRedisTemplate;
    10.     @Resource
    11.     private RedissonClient redissonClient;
    12.     private static final DefaultRedisScript<Long> SECKILL_SCRIPT; // 泛型内填入返回值类型
    13.     static { // 静态属性要使用静态代码块进行初始化
    14.         SECKILL_SCRIPT = new DefaultRedisScript<>();
    15.         SECKILL_SCRIPT.setResultType(Long.class);
    16.         SECKILL_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
    17.     }
    18.     private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024); // 创建阻塞队列
    19.     private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();  // 创建线程池
    20.     // 让大类在开始初始化时就能够执行线程任务
    21.     @PostConstruct
    22.     private void init() {
    23.         SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderTask());
    24.     }
    25.     // 创建线程任务
    26.     private class VoucherOrderTask implements Runnable {
    27.         @Override
    28.         public void run() {
    29.             while(true){
    30.                 try {
    31.                     // 获取队列中的订单信息
    32.                     VoucherOrder voucherOrder = orderTasks.take();// 取出头部信息
    33.                     // 创建订单
    34.                     handleVoucherOrder(voucherOrder);
    35.                 } catch (Exception e) {
    36.                     log.error("处理订单异常",e);
    37.                 }
    38.             }
    39.         }
    40.     }
    41.     // 创建订单
    42.     private void handleVoucherOrder(VoucherOrder voucherOrder) {
    43.         RLock lock = redissonClient.getLock("lock:order:" + voucherOrder.getUserId().toString());
    44.         boolean isLock = lock.tryLock();
    45.         // 判断是否获取锁成功
    46.         if (!isLock) {
    47.             // 获取锁失败,返回错误或重试
    48.             log.error("不允许重复下单");
    49.             return ;
    50.         }
    51.         try {
    52.             proxy.createVoucherOrderMax(voucherOrder);
    53.         } finally {
    54.             lock.unlock();
    55.         }
    56.     }
    57.     @Override
    58.     public void createVoucherOrderMax(VoucherOrder voucherOrder) {
    59.         // 一人一单
    60.         Long userId = voucherOrder.getUserId();
    61.         // 查询订单
    62.         int count = query().eq("user_id",userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
    63.         // 判断是否存在
    64.         if(count > 0){
    65.             // 用户已经购买过
    66.             log.error("用户已经购买过");
    67.             return ;
    68.         }
    69.         // CAS改进:将库存判断改成stock > 0以此来提高性能
    70.         boolean success = seckillVoucherService.update()
    71.                 .setSql("stock= stock -1") // set stock = stock - 1
    72.                 .eq("voucher_id", voucherOrder.getVoucherId()).eq("stock",0) // where id = ? and stock > 0
    73.                 .update();
    74.         if (!success) {
    75.             //扣减库存
    76.             log.error("库存不足!");
    77.             return ;
    78.         }
    79.         //6.创建订单
    80.         save(voucherOrder);
    81.     }
    82.     private IVoucherOrderService proxy; // 代理对象
    83.     public Result seckillVoucherMax(Long voucherId) {
    84.         // 获取用户信息
    85.         Long userId = UserHolder.getUser().getId();
    86.         // 1.执行Lua脚本来判断用户资格
    87.         Long result = stringRedisTemplate.execute(
    88.                             SECKILL_SCRIPT,
    89.                             Collections.emptyList(), // Lua无需接受key
    90.                             voucherId.toString(),
    91.                             userId.toString()
    92.                         );
    93.         // 2.判断结果是否为0
    94.         int r = result.intValue();
    95.         if(r != 0) {
    96.             // 不为0代表无资格购买
    97.             return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
    98.         }
    99.         // 3.有购买资格则将下单信息保存到阻塞队列中
    100.         Long orderId = redisIdWorker.nextId("order");
    101.         // 创建订单
    102.         VoucherOrder voucherOrder = new VoucherOrder();
    103.         voucherOrder.setId(orderId);
    104.         voucherOrder.setUserId(userId);
    105.         voucherOrder.setVoucherId(voucherId);
    106.         // 放入阻塞队列
    107.         orderTasks.add(voucherOrder);
    108.         // 4.获取代理对象(线程异步执行,需要手动在方法内获取)
    109.         proxy = (IVoucherOrderService)AopContext.currentProxy(); // 获取当前类的代理对象  (需要引入aspectjweaver依赖,并且在实现类加入@EnableAspectJAutoProxy(exposeProxy = true)以此来暴露代理对象)
    110.         return Result.ok();
    111.     }

    112. }
    复制代码
    在上面代码中,我们使用下面代码创建了一个单线程的线程池。它保证所有提交的任务都按照提交的顺序执行,每次只有一个线程在工作。
    1. private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
    复制代码
    下面代码是一个常见的阻塞队列实现,具有固定大小(在这里是
    1. 1024 * 1024
    复制代码
    ),它的作用是缓冲和排队任务。
    1. ArrayBlockingQueue
    复制代码
    是一个线程安全的队列,它会自动处理线程之间的同步问题。当队列满时,调用
    1. put()
    复制代码
    方法的线程会被阻塞,直到队列有空间;当队列为空时,调用
    1. take()
    复制代码
    方法的线程会被阻塞,直到队列中有数据。
    1. private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
    复制代码
    在下面代码中,
    1. orderTasks
    复制代码
    阻塞队列用于存放需要处理的订单对象,每个订单的处理逻辑都由
    1. VoucherOrderTask
    复制代码
    线程池中的线程异步执行:
    1. VoucherOrder voucherOrder = orderTasks.take();
    2. handleVoucherOrder(voucherOrder);
    复制代码
    之后我们需要调用 Runnable 接口去实现VoucherOrderTask类以此来创建线程方法
    1. private class VoucherOrderTask implements Runnable {
    2.     @Override
    3.     public void run() {
    4.         while (true) {
    5.             try {
    6.                 // 获取队列中的订单信息
    7.                 VoucherOrder voucherOrder = orderTasks.take(); // 获取订单
    8.                 // 创建订单
    9.                 handleVoucherOrder(voucherOrder);
    10.             } catch (Exception e) {
    11.                 log.error("处理订单异常", e);
    12.             }
    13.         }
    14.     }
    15. }
    复制代码
    随后将线程方法通过 submit() 方法将
    1. VoucherOrderTask
    复制代码
    提交到线程池中,这个任务是一个无限循环的任务,它会不断从阻塞队列中取出订单并处理,直到线程池关闭。
    这种方式使得订单处理任务可以异步执行,而不阻塞主线程,提高了系统的响应能力:
    1. @PostConstruct
    2. private void init() {
    3.     SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderTask());
    4. }
    复制代码
    但是在高并发的情况下就会产生大量订单,就会超出JVM阻塞队列的上线,并且每当服务重启或者宕机的情况发生,阻塞队列的所有订单任务就都会丢失。
    所以为了解决这种情况,我们就要使用消息队列去解决这个问题:

    什么是消息队列?

    消息队列(Message Queue, MQ)是一种用于在应用程序之间传递消息的通信方式。它允许应用程序通过发送和接收消息来解耦,从而提高系统的可扩展性、可靠性和灵活性。消息队列通常用于异步通信、任务队列、事件驱动架构等场景。
    消息队列的核心概念 :

    • 生产者(Producer):发送消息到消息队列的应用程序。
    • 消费者(Consumer):从消息队列中接收并处理消息的应用程序。
    • 队列(Queue):消息的存储区域,生产者将消息发送到队列,消费者从队列中获取消息。
    • 消息(Message):在生产者与消费者之间传递的数据单元。
    • Broker:消息队列的服务器,负责接收、存储和转发消息。

    消息队列是在JVM以外的一个独立的服务,能够不受JVM内存的限制,并且存入MQ的信息都可以做持久化存储。
    详细教学可以查询下面链接:微服务架构 --- 使用RabbitMQ进行异步处理
    但是这样的方式是需要额外提供服务的,所以我们可以使用Redis提供的三种不同的方式来实现消息队列

    • List 结构实现消息队列
    • Pub/Sub(发布/订阅)模式
    • Stream 结构(Redis 5.0 及以上版本)(推荐使用)(详细介绍)
    使用 List 结构实现消息队列:
    Redis 的 List 数据结构是一个双向链表,支持从头部或尾部插入和弹出元素。我们可以利用
    1. LPUSH
    复制代码
    1. BRPOP
    复制代码
    命令实现一个简单的消息队列。
    实现步骤:

    • 生产者:使用
      1. LPUSH
      复制代码
      将消息推入队列。
    • 消费者:使用
      1. BRPOP
      复制代码
      阻塞地从队列中获取消息。
    生产者代码:
    1. import redis.clients.jedis.Jedis;

    2. public class ListProducer {
    3.     public static void main(String[] args) {
    4.         Jedis jedis = new Jedis("localhost", 6379); // 连接 Redis
    5.         String queueName = "myQueue";

    6.         // 发送消息
    7.         for (int i = 1; i <= 5; i++) {
    8.             String message = "Message " + i;
    9.             jedis.lpush(queueName, message); // 将消息推入队列
    10.             System.out.println("Sent: " + message);
    11.         }

    12.         jedis.close(); // 关闭连接
    13.     }
    14. }
    复制代码
    消费者代码:
    1. import redis.clients.jedis.Jedis;

    2. public class ListConsumer {
    3.     public static void main(String[] args) {
    4.         Jedis jedis = new Jedis("localhost", 6379); // 连接 Redis
    5.         String queueName = "myQueue";

    6.         while (true) {
    7.             // 阻塞获取消息,超时时间为 0(无限等待)
    8.             var result = jedis.brpop(0, queueName);
    9.             String message = result.get(1); // 获取消息内容
    10.             System.out.println("Received: " + message);
    11.         }
    12.     }
    13. }
    复制代码

    • 优点:简单易用,适合轻量级场景。
    • 缺点不支持消息确认机制,消息一旦被消费(从队列内取出)就会从队列中删除。并且只支持单消费者(一个消息只能拿出一次)
    使用 Pub/Sub 模式实现消息队列:
    Redis 的 Pub/Sub 模式是一种发布-订阅模型,生产者将消息发布到频道,消费者订阅频道以接收消息。
    实现步骤:

    • 生产者:使用
      1. PUBLISH
      复制代码
      命令向频道发布消息。
    • 消费者:使用
      1. SUBSCRIBE
      复制代码
      命令订阅频道。
    生产者代码:
    1. import redis.clients.jedis.Jedis;

    2. public class PubSubProducer {
    3.     public static void main(String[] args) {
    4.         Jedis jedis = new Jedis("localhost", 6379); // 连接 Redis
    5.         String channelName = "myChannel";

    6.         // 发布消息
    7.         for (int i = 1; i <= 5; i++) {
    8.             String message = "Message " + i;
    9.             jedis.publish(channelName, message); // 发布消息到频道
    10.             System.out.println("Published: " + message);
    11.         }

    12.         jedis.close(); // 关闭连接
    13.     }
    14. }
    复制代码
    消费者代码:
    1. import redis.clients.jedis.Jedis;
    2. import redis.clients.jedis.JedisPubSub;

    3. public class PubSubConsumer {
    4.     public static void main(String[] args) {
    5.         Jedis jedis = new Jedis("localhost", 6379); // 连接 Redis
    6.         String channelName = "myChannel";

    7.         // 创建订阅者
    8.         JedisPubSub subscriber = new JedisPubSub() {
    9.             @Override
    10.             public void onMessage(String channel, String message) {
    11.                 System.out.println("Received: " + message);
    12.             }
    13.         };

    14.         // 订阅频道
    15.         jedis.subscribe(subscriber, channelName);
    16.     }
    17. }
    复制代码

    • 优点:支持一对多的消息广播。
    • 缺点:消息是即时的,如果消费者不在线,消息会丢失。
    但是上面两方式都是有缺点的:

    • 不支持消息确认机制,消息一旦被消费(从队列内取出)就会从队列中删除。并且只支持单消费者(一个消息只能拿出一次)
    • 消息是即时的,如果消费者不在线,消息会丢失。
    所以根据上面的两种方式,我们推出一款全新的方式 ->
    使用 Stream 结构实现消息队列:
    Redis Stream 是一种强大的数据结构,用于管理消息流。它将消息存储在 Redis 中,并允许消费者按顺序获取消息。Stream 具有以下特点:

    • 有序消息:消息按插入顺序排列。
    • 消费者组:一个消费者组可以有多个消费者,每个消费者可以独立消费不同的消息。
    • 消息 ID:每条消息都有唯一的 ID(如:
      1. 1588890470850-0
      复制代码
      ),ID 按时间戳生成。
    • 自动分配消息:多个消费者可以从 Stream 中并行消费消息,保证消息不会重复消费。


    在 Redis Stream 中,一个队列可以有多个消费者组,每个消费者组可以独立地消费队列中的消息。每个消费者组内有多个消费者,而消费者是基于 消费者名称 进行识别的。

    消费者组的工作方式


    • 每个消费者组拥有自己的 消费进度,也就是每个消费者组会从 自己独立的消息 ID 开始消费
    • 多个消费者组之间是相互独立的,即使它们消费的是同一个队列,它们也可以从不同的位置开始消费队列中的消息。
    • 每个消费者组都可以有多个 消费者(在同一个组内,多个消费者可以并行消费同一个队列的消息,但每个消息在消费者组内只能被一个消费者处理一次)。
    假设有一个队列(Stream)
    1. mystream
    复制代码
    ,可以为它创建多个消费者组:
    1. XGROUP CREATE mystream group1 $ MKSTREAM
    2. XGROUP CREATE mystream group2 $ MKSTREAM
    复制代码
    这样,
    1. mystream
    复制代码
    队列上就有了两个消费者组:
    1. group1
    复制代码
    1. group2
    复制代码
    。每个消费者组可以有自己的消费者并从该队列中读取消息。此时,
    1. group1
    复制代码
    1. group2
    复制代码
    都在消费同一个队列
    1. mystream
    复制代码
    ,但它们的消费进度是独立的,它们各自有自己的消息 ID 记录。
    每个消费者组可以有多个消费者,而每个消费者通过一个 唯一的消费者名称 来标识。

    每个消费者组有独立的消费进度

    每个消费者组会记录自己的消费进度,也就是它消费到队列中的 哪个消息 ID。即使多个消费者组在消费同一个消息队列,它们每个组都会从 不同的消费位置(消息 ID)开始读取消息。
    例如,假设有一个队列
    1. mystream
    复制代码
    ,同时有两个消费者组
    1. group1
    复制代码
    1. group2
    复制代码
    ,它们都从
    1. mystream
    复制代码
    队列中读取消息:

      1. group1
      复制代码
      1. mystream
      复制代码
      队列中的消息
      1. id1
      复制代码
      开始消费,
      1. group1
      复制代码
      的进度会记录在 Redis 中。
      1. group2
      复制代码
      1. mystream
      复制代码
      队列中的消息
      1. id2
      复制代码
      开始消费,
      1. group2
      复制代码
      的进度也会记录在 Redis 中。
    消费进度互不干扰,即便
    1. group1
    复制代码
    1. group2
    复制代码
    都在消费
    1. mystream
    复制代码
    队列,它们的消费位置是独立的。

    消费者组内部的消息消费

    一个消费者组内的消费者会 共享 组内的消息。即使有多个消费者,每条消息 在消费者组内部只会被 一个消费者 消费。消费者之间会并行处理消息,但每条消息只会被一个消费者处理。
    举个例子:假设
    1. group1
    复制代码
    中有三个消费者
    1. consumer1
    复制代码
    1. consumer2
    复制代码
    1. consumer3
    复制代码
    ,如果队列
    1. mystream
    复制代码
    有 6 条消息,那么它们会如下消费:

      1. consumer1
      复制代码
      处理消息
      1. 1
      复制代码
      1. 2
      复制代码
      1. consumer2
      复制代码
      处理消息
      1. 3
      复制代码
      1. 4
      复制代码
      1. consumer3
      复制代码
      处理消息
      1. 5
      复制代码
      1. 6
      复制代码
    但对于消费者组
    1. group2
    复制代码
    ,如果它有自己的消费者,
    1. group2
    复制代码
    内的消费者也会并行消费
    1. mystream
    复制代码
    中的消息,而
    1. group1
    复制代码
    1. group2
    复制代码
    之间没有直接关系。
    首先初始化一个消息队列:
    在项目启动时,确保 Redis 中存在对应的 Stream 和消费者组。可以通过程序在启动时检查并创建(如果不存在的话)。
    1. @Configuration
    2. public class RedisStreamConfig {

    3.     @Autowired
    4.     private StringRedisTemplate redisTemplate;

    5.     private static final String STREAM_KEY = "mystream";
    6.     private static final String GROUP_NAME = "mygroup";

    7.     @PostConstruct
    8.     public void init() {
    9.         // 检查消费者组是否存在,若不存在则创建
    10.         try {
    11.             // 如果消费者组不存在则会抛出异常,我们捕获异常进行创建
    12.             redisTemplate.opsForStream().groups(STREAM_KEY);
    13.         } catch (Exception e) {
    14.             // 创建消费者组,起始位置为 $ 表示从末尾开始消费新消息
    15.             redisTemplate.opsForStream().createGroup(STREAM_KEY, GROUP_NAME);
    16.         }
    17.     }
    18. }
    复制代码
    注意:

      1. opsForStream().groups(STREAM_KEY)
      复制代码
      :查询消费者组是否已存在。
      1. opsForStream().createGroup(STREAM_KEY, GROUP_NAME)
      复制代码
      :如果没有消费者组,则创建一个新的组。
    随后我们生产者发送消息示例:
    1. @Service  
    2. public class RedisStreamProducerService {  // 定义生产者服务类 RedisStreamProducerService

    3.     private static final String STREAM_KEY = "mystream";  // 定义 Redis Stream 的名称,这里指定队列名为 "mystream"

    4.     @Autowired  
    5.     private StringRedisTemplate redisTemplate;

    6.     public void sendMessage(String content) {  // 定义一个方法,发送消息到 Redis Stream,参数 content 是消息的内容
    7.         Map<String, String> map = new HashMap<>();  // 创建一个 Map 用来存储消息内容
    8.         map.put("content", content);  // 将消息内容添加到 Map 中,键是 "content",值是传入的内容

    9.         // 在消息队列中添加消息,调用 StringRedisTemplate 的 opsForStream 方法
    10.         RecordId recordId = redisTemplate.opsForStream()  // 获取操作 Redis Stream 的操作对象
    11.                 .add(StreamRecords.objectBacked(map)  // 创建一个 Stream 记录,将 Map 转化为对象记录
    12.                 .withStreamKey(STREAM_KEY));  // 设置该记录属于的 Stream(消息队列)的名称
    13.         // 输出记录的 ID,表示消息已经成功发送
    14.         System.out.println("消息发送成功,id: " + recordId.getValue());  // 打印消息的 ID,表明该消息已经被成功加入到 Stream 中
    15.     }
    16. }
    复制代码
    1. RecordId
    复制代码
    是 Spring Data Redis 中的一个类,用来表示 消息的唯一标识符。它对应 Redis Stream 中的 消息 ID,该 ID 是 Redis Stream 中每条消息的唯一标识。Redis 中的消息 ID 通常是由时间戳和序号组成的(如
    1. 1588890470850-0
    复制代码
    )。
    主要功能:

    • 表示消息 ID
      1. RecordId
      复制代码
      是一个封装类,表示 Redis Stream 中消息的 ID。
    • 用于识别和操作消息:在消费和确认消息时,
      1. RecordId
      复制代码
      用来标识每条消息的唯一性,并帮助 Redis 确定消息是否已经被消费
    使用场景:
    1. RecordId
    复制代码
    用来标识从 Stream 中读取到的消息,我们可以通过
    1. RecordId
    复制代码
    来进行消息的确认、删除或其他操作。
    1. RecordId recordId = redisTemplate.opsForStream().add(StreamRecords.objectBacked(map).withStreamKey("mystream"));
    复制代码
    通过
    1. StreamRecords.objectBacked(map)
    复制代码
    1. map
    复制代码
    对象作为消息内容,并用
    1. add
    复制代码
    方法将其写入 Stream。
    在然后编写消费者服务:
    使用 RedisTemplate 的
    1. read
    复制代码
    方法(底层执行的是
    1. XREADGROUP
    复制代码
    命令)从消费者组中拉取消息,并进行处理。消费者可以采用定时任务或后台线程不断轮询
    1. @Slf4j  
    2. @Service  
    3. public class RedisStreamConsumerService {
    4.     private static final String STREAM_KEY = "mystream";  // Redis Stream 的名称,这里指定队列名为 "mystream"
    5.     private static final String GROUP_NAME = "mygroup";  // 消费者组的名称,多个消费者可以通过组名共享消费队列
    6.     private static final String CONSUMER_NAME = "consumer-1";  // 消费者的名称,消费者名称在同一消费者组内必须唯一

    7.     @Autowired  
    8.     private StringRedisTemplate redisTemplate;

    9.     @PostConstruct  // 使用该注解能让方法在 Spring 完成依赖注入后自动调用,用于初始化任务
    10.     @Async  // 将该方法标记为异步执行,允许它在单独的线程中运行,不会阻塞主线程,@EnableAsync 需要在配置类中启用
    11.     public void start() {  // 启动方法,在应用启动时执行
    12.         // 无限循环,不断从 Redis Stream 中读取消息(可以改为定时任务等方式)
    13.         while (true) {
    14.             try {
    15.                 // 设置 Stream 读取的阻塞超时,设置最多等待 2 秒
    16.                 StreamReadOptions options = StreamReadOptions.empty().block(Duration.ofSeconds(2));
    17.                 // 从指定的消费者组中读取消息,">" 表示只消费未被消费过的消息
    18.                 List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read(
    19.                         Consumer.from(GROUP_NAME, CONSUMER_NAME),  // 指定消费者组和消费者名称
    20.                         options,  // 设置读取选项,包含阻塞时间
    21.                         StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed())  // 从最后消费的消息开始读取
    22.                 );
    23.                 // 如果没有消息,继续循环读取
    24.                 if (messages == null || messages.isEmpty()) {
    25.                     continue;  
    26.                 }
    27.                 // 处理每一条读取到的消息
    28.                 for (MapRecord<String, Object, Object> message : messages) {
    29.                     String messageId = message.getId();  // 获取消息的唯一标识符(ID)
    30.                     Map<Object, Object> value = message.getValue();  // 获取消息内容(以 Map 形式存储)
    31.                     log.info("接收到消息,id={},内容={}", messageId, value);  // 打印日志,记录消息 ID 和内容
    32.                     // 在这里加入业务逻辑处理
    33.                     // 例如处理消息并执行相应的操作
    34.                     // ...

    35.                     // 消息处理成功后,需要确认消息已经被消费(通过 XACK 命令)
    36.                     redisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP_NAME, messageId);  // 确认消费的消息
    37.                 }
    38.             } catch (Exception e) {
    39.                 log.error("读取 Redis Stream 消息异常", e);  // 异常捕获,记录错误日志
    40.             }
    41.         }
    42.     }
    43. }
    复制代码
    1. MapRecord<String, Object, Object>
    复制代码
    是 Spring Data Redis 用来表示 Redis Stream 中的 消息记录 的类。它不仅包含了消息的 ID,还包含了消息的内容(即消息数据)。
    在 Redis 中,每条消息都存储为一个 key-value 对。
    主要功能:

    • 封装消息 ID 和消息内容
      1. MapRecord
      复制代码
      用来封装消息的 ID 和消息的内容。
    • 消息的内容:消息的内容通常是一个 键值对
      1. Map<String, Object>
      复制代码
      ),可以是任意对象的数据结构(例如,JSON、Map 或其他序列化对象)。
    字段:

      1. getId()
      复制代码
      :返回消息的 ID(
      1. RecordId
      复制代码
      类型)。
      1. getValue()
      复制代码
      :返回消息的内容,以
      1. Map<Object, Object>
      复制代码
      的形式。
    使用场景:
    1. MapRecord
    复制代码
    是用来表示从 Stream 中读取到的消息,它将消息的 ID 和内容(键值对)封装在一起。你可以使用
    1. MapRecord
    复制代码
    来获取消息的 ID 和内容并处理。
    1. MapRecord<String, Object, Object> message = redisTemplate.opsForStream().read(Consumer.from("mygroup", "consumer1"), options, StreamOffset.create("mystream", ReadOffset.lastConsumed()));
    复制代码
    在这个例子中,
    1. message
    复制代码
    是一个
    1. MapRecord
    复制代码
    实例,它封装了从
    1. mystream
    复制代码
    队列中读取到的消息。我们可以通过
    1. message.getId()
    复制代码
    获取消息 ID,通过
    1. message.getValue()
    复制代码
    获取消息内容。
    在消费者中,我们使用
    1. MapRecord<String, Object, Object>
    复制代码
    来封装消息,获取
    1. message.getId()
    复制代码
    来获取消息的 ID(
    1. RecordId
    复制代码
    ),以及通过
    1. message.getValue()
    复制代码
    获取消息的内容。 随后在处理完消息后,调用
    1. acknowledge()
    复制代码
    来确认消息已经被消费。
    最后启动异步支持:
    1. @SpringBootApplication
    2. @EnableAsync // 启动异步支持
    3. public class MyApplication {
    4.     public static void main(String[] args) {
    5.         SpringApplication.run(MyApplication.class, args);
    6.     }
    7. }
    复制代码
    通过这种方式,Spring Data Redis 提供了高效且类型安全的接口来操作 Redis Stream,帮助我们在分布式系统中实现高效的消息队列。

    总结

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

    来源:https://www.jb51.net/database/335149zev.htm
    免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

    本帖子中包含更多资源

    您需要 登录 才可以下载或查看,没有账号?立即注册

    ×

    最新评论

    QQ Archiver 手机版 小黑屋 福建二哥 ( 闽ICP备2022004717号|闽公网安备35052402000345号 )

    Powered by Discuz! X3.5 © 2001-2023

    快速回复 返回顶部 返回列表