• 设为首页
  • 收藏本站
  • 积分充值
  • VIP赞助
  • 手机版
  • 微博
  • 微信
    微信公众号 添加方式:
    1:搜索微信号(888888
    2:扫描左侧二维码
  • 快捷导航
    福建二哥 门户 查看主题

    Redis消息队列实现异步秒杀功能

    发布者: 皮3591 | 发布时间: 2025-6-19 12:36| 查看数: 35| 评论数: 0|帖子模式

    1 Redis消息队列

    在高并发场景下,为了提高秒杀业务的性能,可将部分工作交给 Redis 处理,并通过异步方式执行。Redis 提供了多种数据结构来实现消息队列,总结三种。

    1.1 List 结构


    • 原理:基于 List 结构模拟消息队列,使用
      1. BRPUSH
      复制代码
      生产消息,
      1. BRPOP
      复制代码
      消费消息。
    • 命令示例

      • 生产消息
        1. BRPUSH key value [value ...]
        复制代码
        ,将一个或多个元素推入到指定列表的头部。如果列表不存在,会自动创建一个新的列表。
      • 消费消息
        1. BRPOP key [key ...] timeout
        复制代码
        ,从指定的一个或多个列表中弹出最后一个元素。如果列表为空,该命令会导致客户端阻塞,直到有数据可用或超过指定的超时时间。

    • 优缺点
    • 优点:不会内存超限、可以持久化、消息有序性。
    • 缺点:无法避免数据丢失、只支持单消费者。

    1.2 Pub/Sub 模式


    • 原理:发布订阅模式,基本的点对点消息模型,支持多生产、多消费者。
    • 命令示例

      • 生产消息
        1. PUBLISH channel message
        复制代码
        ,用于向指定频道发布一条消息。
      • 消费消息
        1. SUBSCRIBE channel [channel]
        复制代码
        :订阅一个或多个频道。

          1. UNSUBSCRIBE [channel [channel ...]]
          复制代码
          :取消订阅一个或多个频道。
          1. PSUBSCRIBE pattern [pattern ...]
          复制代码
          :订阅一个或多个符合给定模式的频道,接收消息。
          1. PUNSUBSCRIBE [pattern [pattern ...]]
          复制代码
          :取消订阅一个或多个符合给定模式的频道。


    • 优缺点

      • 优点:支持多生产、多消费者。
      • 缺点:不支持持久化、无法避免数据丢失,消息堆积有上限(消费者会缓存消息),超出会丢失消息。


    1.3 Stream 结构


    • 原理:Redis 5.0 引入的专门为消息队列设计的数据类型,支持消息可回溯、一个消息可以被多个消费者消费、可以阻塞读取。
    • 命令示例

      • 生产消息
        1. XADD key *|ID value [value ...]
        复制代码
        ,向指定的 Stream 流中添加一个消息。例如:
        1. XADD users * name jack age 21
        复制代码
        ,创建名为
        1. users
        复制代码
        的队列,并向其中发送一个消息,内容是
        1. {name=jack,age=21}
        复制代码
        ,使用 Redis 自动生成 ID。
      • 消费消息
        1. XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID ID
        复制代码
        。例如:

          1. XREAD COUNT 1 STREAMS users 0
          复制代码
          :读取
          1. users
          复制代码
          队列中的第一条消息。
          1. XREAD COUNT 1 BLOCK 1000 STREAMS users $
          复制代码
          :阻塞 1 秒钟后从
          1. users
          复制代码
          队列中读取最新消息。


    • 消费者组模式

      • 特点:消息分流、消息标识、消息确认。
      • 命令示例

          1. XGROUP CREATE key groupName ID
          复制代码
          :创建消费者组。
          1. XGROUP DESTORY key groupName
          复制代码
          :删除指定的消费者组。
          1. XGROUP CREATECONSUMER key groupName consumerName
          复制代码
          :给指定的消费者组添加消费者。
          1. XGROUP DELCONSUMER key groupName consumerName
          复制代码
          :删除消费者组中指定消费者。
          1. XREADGROUP GROUP
          复制代码
          :从消费者组中读取消息。


    • 优缺点

      • 优点:消息可回溯、可以多消费者争抢消息,加快消费速度、可以阻塞读取、没有消息漏读的风险、有消息确认机制,保证消息至少被消费一次。
      • 缺点:有消息漏读的风险(单消费方式下)。


    1.4 Redis Stream消息队列的特点

    Redis 5.0引入的
    1. Stream
    复制代码
    类型是专门为消息队列设计的,支持以下特性:

    • 消息持久化:消息存储在内存中,支持持久化到磁盘,避免消息丢失。
    • 消费者组(Consumer Group)

      • 消息分流:一个队列可以被多个消费者组订阅,组内多个消费者分摊消息处理。
      • 消息回溯:支持按消息ID回溯历史消息。
      • 消息确认(ACK):消费者处理完消息后需确认,否则消息会进入
        1. pending-list
        复制代码
        等待重试。

    • 阻塞读取:消费者可以阻塞等待新消息,减少CPU空转。
    • 避免消息丢失:通过
      1. pending-list
      复制代码
      机制,确保消息至少被消费一次。

    2 秒杀业务处理



    2.1 使用Lua脚本处理库存和订单

    目标:在Redis中完成库存判断和订单校验,确保原子性。
    1. -- 参数:优惠券ID、用户ID、订单ID
    2. local voucherId = ARGV[1]
    3. local userId = ARGV[2]
    4. local orderId = ARGV[3]
    5. -- 库存Key和订单Key
    6. local stockKey = 'seckill:stock:' .. voucherId
    7. local orderKey = 'seckill:order:' .. voucherId
    8. -- 判断库存是否充足
    9. if (tonumber(redis.call('GET', stockKey)) <= 0 then
    10.     return 1 -- 库存不足
    11. end
    12. -- 判断用户是否已下单
    13. if (redis.call('SISMEMBER', orderKey, userId) == 1 then
    14.     return 2 -- 用户已下单
    15. end
    16. -- 扣减库存并记录订单
    17. redis.call('DECR', stockKey)
    18. redis.call('SADD', orderKey, userId)
    19. -- 将订单信息发送到消息队列
    20. redis.call('XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
    21. return 0 -- 成功
    复制代码
    脚本说明

    • 原子性操作:库存检查、订单校验、消息发送在一个脚本中完成。
    • 消息发送:使用
      1. XADD
      复制代码
      将订单信息写入
      1. stream.orders
      复制代码
      队列。

    2.2 创建消费者组


      1. XGROUP CREATE stream.orders g1 0 MKSTREAM
      复制代码
    1. g1
    复制代码
    :消费者组名称。
    1. MKSTREAM
    复制代码
    :如果队列不存在则自动创建。

    2.3 Java代码实现


      1. init
      复制代码
      方法:在类初始化时创建消息队列,并启动一个线程任务从消息队列中获取订单信息。
      1. VoucherOrderHandler
      复制代码
      类:实现
      1. Runnable
      复制代码
      接口,作为线程任务,不断从消息队列中获取订单信息。如果获取成功,将消息转换为
      1. VoucherOrder
      复制代码
      对象,调用
      1. handleVoucherOrder
      复制代码
      方法处理订单,并进行 ACK 确认;如果出现异常,调用
      1. handlePendingList
      复制代码
      方法处理异常消息。
      1. handlePendingList
      复制代码
      方法:从
      1. pendingList
      复制代码
      中获取订单信息,处理订单并进行 ACK 确认,直到
      1. pendingList
      复制代码
      中没有消息。
      1. handleVoucherOrder
      复制代码
      方法:使用 Redisson 分布式锁确保一人一单,调用代理对象的
      1. createVoucherOrder
      复制代码
      方法创建订单。
      1. seckillVoucher
      复制代码
      方法:执行 Lua 脚本判断用户是否具有秒杀资格,如果具有资格,将订单信息发送到消息队列,并返回下单成功信息。
      1. createVoucherOrder
      复制代码
      方法:判断当前用户是否是第一单,如果是则扣减库存并将订单保存到数据库。

    系统启动与初始化

    系统启动时,
    1. VoucherOrderServiceImpl
    复制代码
    类的
    1. @PostConstruct
    复制代码
    注解会触发
    1. init
    复制代码
    方法执行。该方法先加载创建消息队列的 Lua 脚本,通过
    1. stringRedisTemplate.execute
    复制代码
    方法执行脚本创建 Redis Stream 消息队列和消费者组。若创建成功或队列已存在,会记录相应日志。之后,使用线程池
    1. SECKILL_ORDER_EXECUTOR
    复制代码
    启动
    1. VoucherOrderHandler
    复制代码
    线程,该线程负责后续从消息队列获取订单信息并处理。

    用户发起秒杀请求

    用户发起秒杀请求后,系统调用
    1. VoucherOrderServiceImpl
    复制代码
    1. seckillVoucher
    复制代码
    方法。此方法先从
    1. ThreadLocalUtls
    复制代码
    中获取用户 ID,用
    1. redisIdWorker
    复制代码
    生成订单 ID。接着执行判断用户秒杀资格的 Lua 脚本,该脚本接收优惠券 ID、用户 ID 和订单 ID 作为参数。若脚本返回值表明库存不足或用户已下单,方法返回相应的失败提示;若返回值为 0,说明用户有秒杀资格,创建代理对象并返回下单成功结果。

    Lua 脚本执行逻辑

    Lua 脚本接收到参数后,根据优惠券 ID 拼接库存和订单的 Redis key。先通过
    1. GET
    复制代码
    命令获取库存,若库存小于等于 0 则返回 1 表示库存不足。若库存充足,使用
    1. SISMEMBER
    复制代码
    命令检查用户是否已下单,若已下单则返回 2。若库存充足且用户未下单,使用
    1. INCRBY
    复制代码
    命令扣减库存,
    1. SADD
    复制代码
    命令记录订单信息,最后返回 0 表示下单成功。

    订单处理线程工作
    1. VoucherOrderHandler
    复制代码
    线程启动后进入无限循环,不断从 Redis Stream 消息队列获取订单信息。若未获取到消息,继续下一次循环;若获取到消息,将消息转换为
    1. VoucherOrder
    复制代码
    对象,调用
    1. handleVoucherOrder
    复制代码
    方法处理订单,处理完成后向消息队列发送 ACK 确认消息。若处理过程中出现异常,调用
    1. handlePendingList
    复制代码
    方法处理异常消息。

    订单处理方法 handleVoucherOrder
    1. handleVoucherOrder
    复制代码
    方法接收
    1. VoucherOrder
    复制代码
    对象,根据用户 ID 获取 Redisson 分布式锁。尝试获取锁,若失败记录错误日志并返回;若成功,调用代理对象的
    1. createVoucherOrder
    复制代码
    方法创建订单,最后释放锁。

    订单创建方法 createVoucherOrder

    该方法先判断当前用户是否是第一单,通过查询数据库中该用户的订单数量来判断。若不是第一单,记录错误日志并返回;若是第一单,尝试扣减秒杀券库存,若扣减失败抛出异常。若库存扣减成功,将订单信息保存到数据库,若保存失败也抛出异常。
    1. @Service
    2. public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    3.     @Resource
    4.     private ISeckillVoucherService seckillVoucherService;
    5.     @Resource
    6.     private RedisIdWorker redisIdWorker;
    7.     @Resource
    8.     private StringRedisTemplate stringRedisTemplate;
    9.     @Resource
    10.     private RedissonClient redissonClient;
    11.     /**
    12.      * 当前类初始化完毕就立马执行该方法
    13.      */
    14.     @PostConstruct
    15.     private void init() {
    16.         // 创建消息队列
    17.         DefaultRedisScript<Long> mqScript = new DefaultRedisScript<>();
    18.         mqScript.setLocation(new ClassPathResource("lua/stream-mq.lua"));
    19.         mqScript.setResultType(Long.class);
    20.         Long result = null;
    21.         try {
    22.             result = stringRedisTemplate.execute(mqScript,
    23.                     Collections.emptyList(),
    24.                     QUEUE_NAME,
    25.                     GROUP_NAME);
    26.         } catch (Exception e) {
    27.             log.error("队列创建失败", e);
    28.             return;
    29.         }
    30.         int r = result.intValue();
    31.         String info = r == 1 ? "队列创建成功" : "队列已存在";
    32.         log.debug(info);
    33.         // 执行线程任务
    34.         SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    35.     }
    36.     /**
    37.      * 线程池
    38.      */
    39.     private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
    40.     /**
    41.      * 队列名
    42.      */
    43.     private static final String QUEUE_NAME = "stream.orders";
    44.     /**
    45.      * 组名
    46.      */
    47.     private static final String GROUP_NAME = "g1";
    48.     /**
    49.      * 线程任务: 不断从消息队列中获取订单
    50.      */
    51.     private class VoucherOrderHandler implements Runnable {
    52.         @Override
    53.         public void run() {
    54.             while (true) {
    55.                 try {
    56.                     // 1、从消息队列中获取订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS streams.order >
    57.                     List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read(
    58.                             Consumer.from(GROUP_NAME, "c1"),
    59.                             StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),
    60.                             StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed())
    61.                     );
    62.                     // 2、判断消息获取是否成功
    63.                     if (messageList == null || messageList.isEmpty()) {
    64.                         // 2.1 消息获取失败,说明没有消息,进入下一次循环获取消息
    65.                         continue;
    66.                     }
    67.                     // 3、消息获取成功,可以下单
    68.                     // 将消息转成VoucherOrder对象
    69.                     MapRecord<String, Object, Object> record = messageList.get(0);
    70.                     Map<Object, Object> messageMap = record.getValue();
    71.                     VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);
    72.                     handleVoucherOrder(voucherOrder);
    73.                     // 4、ACK确认 SACK stream.orders g1 id
    74.                     stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
    75.                 } catch (Exception e) {
    76.                     log.error("处理订单异常", e);
    77.                     // 处理异常消息
    78.                     handlePendingList();
    79.                 }
    80.             }
    81.         }
    82.     }
    83.     private void handlePendingList() {
    84.         while (true) {
    85.             try {
    86.                 // 1、从pendingList中获取订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS streams.order 0
    87.                 List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read(
    88.                         Consumer.from(GROUP_NAME, "c1"),
    89.                         StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),
    90.                         StreamOffset.create(QUEUE_NAME, ReadOffset.from("0"))
    91.                 );
    92.                 // 2、判断pendingList中是否有效性
    93.                 if (messageList == null || messageList.isEmpty()) {
    94.                     // 2.1 pendingList中没有消息,直接结束循环
    95.                     break;
    96.                 }
    97.                 // 3、pendingList中有消息
    98.                 // 将消息转成VoucherOrder对象
    99.                 MapRecord<String, Object, Object> record = messageList.get(0);
    100.                 Map<Object, Object> messageMap = record.getValue();
    101.                 VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);
    102.                 handleVoucherOrder(voucherOrder);
    103.                 // 4、ACK确认 SACK stream.orders g1 id
    104.                 stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
    105.             } catch (Exception e) {
    106.                 log.error("处理订单异常", e);
    107.                 // 这里不用调自己,直接就进入下一次循环,再从pendingList中取,这里只需要休眠一下,防止获取消息太频繁
    108.                 try {
    109.                     Thread.sleep(20);
    110.                 } catch (InterruptedException ex) {
    111.                     log.error("线程休眠异常", ex);
    112.                 }
    113.             }
    114.         }
    115.     }
    116.     /**
    117.      * 创建订单
    118.      *
    119.      * @param voucherOrder
    120.      */
    121.     private void handleVoucherOrder(VoucherOrder voucherOrder) {
    122.         Long userId = voucherOrder.getUserId();
    123.         RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);
    124.         boolean isLock = lock.tryLock();
    125.         if (!isLock) {
    126.             // 索取锁失败,重试或者直接抛异常(这个业务是一人一单,所以直接返回失败信息)
    127.             log.error("一人只能下一单");
    128.             return;
    129.         }
    130.         try {
    131.             // 创建订单(使用代理对象调用,是为了确保事务生效)
    132.             proxy.createVoucherOrder(voucherOrder);
    133.         } finally {
    134.             lock.unlock();
    135.         }
    136.     }
    137.     /**
    138.      * 加载 判断秒杀券库存是否充足 并且 判断用户是否已下单 的Lua脚本
    139.      */
    140.     private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    141.     static {
    142.         SECKILL_SCRIPT = new DefaultRedisScript<>();
    143.         SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/stream-seckill.lua"));
    144.         SECKILL_SCRIPT.setResultType(Long.class);
    145.     }
    146.     /**
    147.      * VoucherOrderServiceImpl类的代理对象
    148.      * 将代理对象的作用域进行提升,方面子线程取用
    149.      */
    150.     private IVoucherOrderService proxy;
    151.     /**
    152.      * 抢购秒杀券
    153.      *
    154.      * @param voucherId
    155.      * @return
    156.      */
    157.     @Transactional
    158.     @Override
    159.     public Result seckillVoucher(Long voucherId) {
    160.         Long userId = ThreadLocalUtls.getUser().getId();
    161.         long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);
    162.         // 1、执行Lua脚本,判断用户是否具有秒杀资格
    163.         Long result = null;
    164.         try {
    165.             result = stringRedisTemplate.execute(
    166.                     SECKILL_SCRIPT,
    167.                     Collections.emptyList(),
    168.                     voucherId.toString(),
    169.                     userId.toString(),
    170.                     String.valueOf(orderId)
    171.             );
    172.         } catch (Exception e) {
    173.             log.error("Lua脚本执行失败");
    174.             throw new RuntimeException(e);
    175.         }
    176.         if (result != null && !result.equals(0L)) {
    177.             // result为1表示库存不足,result为2表示用户已下单
    178.             int r = result.intValue();
    179.             return Result.fail(r == 2 ? "不能重复下单" : "库存不足");
    180.         }
    181.         // 2、result为0,下单成功,直接返回ok
    182.         // 索取锁成功,创建代理对象,使用代理对象调用第三方事务方法, 防止事务失效
    183.         IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
    184.         this.proxy = proxy;
    185.         return Result.ok();
    186.     }
    187.     /**
    188.      * 创建订单
    189.      *
    190.      * @param voucherOrder
    191.      * @return
    192.      */
    193.     @Transactional
    194.     @Override
    195.     public void createVoucherOrder(VoucherOrder voucherOrder) {
    196.         Long userId = voucherOrder.getUserId();
    197.         Long voucherId = voucherOrder.getVoucherId();
    198.         // 1、判断当前用户是否是第一单
    199.         int count = this.count(new LambdaQueryWrapper<VoucherOrder>()
    200.                 .eq(VoucherOrder::getUserId, userId));
    201.         if (count >= 1) {
    202.             // 当前用户不是第一单
    203.             log.error("当前用户不是第一单");
    204.             return;
    205.         }
    206.         // 2、用户是第一单,可以下单,秒杀券库存数量减一
    207.         boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
    208.                 .eq(SeckillVoucher::getVoucherId, voucherId)
    209.                 .gt(SeckillVoucher::getStock, 0)
    210.                 .setSql("stock = stock -1"));
    211.         if (!flag) {
    212.             throw new RuntimeException("秒杀券扣减失败");
    213.         }
    214.         // 3、将订单保存到数据库
    215.         flag = this.save(voucherOrder);
    216.         if (!flag) {
    217.             throw new RuntimeException("创建秒杀券订单失败");
    218.         }
    219.     }
    220. }
    复制代码
    3 秒杀流程剖析


    3.1 初始化操作

    Lua 脚本准备:编写 Lua 脚本,接收优惠券 ID 和用户 ID 作为参数,判断库存是否充足以及用户是否已下单。若库存不足返回 1,用户已下单返回 2,下单成功返回 0。
    1. -- 优惠券id
    2. local voucherId = ARGV[1];
    3. -- 用户id
    4. local userId = ARGV[2];
    5. local stockKey = 'seckill:stock:' .. voucherId;
    6. local orderKey = 'seckill:order:' .. voucherId;
    7. local stock = redis.call('GET', stockKey);
    8. if (tonumber(stock) <= 0) then
    9.     return 1;
    10. end
    11. if (redis.call('SISMEMBER', orderKey, userId) == 1) then
    12.     return 2;
    13. end
    14. redis.call('INCRBY', stockKey, -1);
    15. redis.call('SADD', orderKey, userId);
    16. return 0;
    复制代码
    消息队列创建:在 Java 代码的
    1. @PostConstruct
    复制代码
    方法中,通过执行 Lua 脚本创建 Redis 的 Stream 消息队列和消费者组。
    1. @PostConstruct
    2. private void init() {
    3.     DefaultRedisScript<Long> mqScript = new DefaultRedisScript<>();
    4.     mqScript.setLocation(new ClassPathResource("lua/stream-mq.lua"));
    5.     mqScript.setResultType(Long.class);
    6.     Long result = stringRedisTemplate.execute(mqScript, Collections.emptyList(), QUEUE_NAME, GROUP_NAME);
    7.     if (result == 1) {
    8.         log.debug("队列创建成功");
    9.     } else {
    10.         log.debug("队列已存在");
    11.     }
    12.     SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    13. }
    复制代码
    3.2 秒杀请求处理

    资格判断:用户发起秒杀请求,系统执行 Lua 脚本,根据返回结果判断用户是否具有秒杀资格。若返回 1 表示库存不足,返回 2 表示用户已下单,均返回失败信息;返回 0 则表示具有秒杀资格。
    1. @Override
    2. public Result seckillVoucher(Long voucherId) {
    3.     Long userId = ThreadLocalUtls.getUser().getId();
    4.     long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);
    5.     Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),
    6.                                             voucherId.toString(), userId.toString(), String.valueOf(orderId));
    7.     if (result != 0) {
    8.         return Result.fail(result == 2 ? "不能重复下单" : "库存不足");
    9.     }
    10.     IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
    11.     this.proxy = proxy;
    12.     return Result.ok();
    13. }
    复制代码
    订单入队:具有秒杀资格后,生成订单 ID,创建订单对象,将订单信息发送到 Redis 的 Stream 消息队列。

    3.3 消息队列消费

    订单处理线程:使用线程池启动一个线程任务
    1. VoucherOrderHandler
    复制代码
    ,不断从消息队列中获取订单信息。
    1. private class VoucherOrderHandler implements Runnable {
    2.     @Override
    3.     public void run() {
    4.         while (true) {
    5.             try {
    6.                 List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read(
    7.                     Consumer.from(GROUP_NAME, "c1"),
    8.                     StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),
    9.                     StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed())
    10.                 );
    11.                 if (messageList == null || messageList.isEmpty()) {
    12.                     continue;
    13.                 }
    14.                 MapRecord<String, Object, Object> record = messageList.get(0);
    15.                 Map<Object, Object> messageMap = record.getValue();
    16.                 VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);
    17.                 handleVoucherOrder(voucherOrder);
    18.                 stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
    19.             } catch (Exception e) {
    20.                 log.error("处理订单异常", e);
    21.                 handlePendingList();
    22.             }
    23.         }
    24.     }
    25. }
    复制代码
    异常处理:若处理订单过程中出现异常,调用
    1. handlePendingList
    复制代码
    方法从
    1. pendingList
    复制代码
    中获取未处理的订单信息,继续处理。

    3.4 订单创建

    分布式锁保障:使用 Redisson 分布式锁,确保同一用户同一时间只能创建一个订单,避免一人多单问题。
    1. private void handleVoucherOrder(VoucherOrder voucherOrder) {
    2.     Long userId = voucherOrder.getUserId();
    3.     RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);
    4.     boolean isLock = lock.tryLock();
    5.     if (!isLock) {
    6.         log.error("一人只能下一单");
    7.         return;
    8.     }
    9.     try {
    10.         proxy.createVoucherOrder(voucherOrder);
    11.     } finally {
    12.         lock.unlock();
    13.     }
    14. }
    复制代码
    数据库操作:判断用户是否是第一单,若是则扣减库存并将订单保存到数据库。
    1. @Override
    2. public void createVoucherOrder(VoucherOrder voucherOrder) {
    3.     Long userId = voucherOrder.getUserId();
    4.     Long voucherId = voucherOrder.getVoucherId();
    5.     int count = this.count(new LambdaQueryWrapper<VoucherOrder>().eq(VoucherOrder::getUserId, userId));
    6.     if (count >= 1) {
    7.         log.error("当前用户不是第一单");
    8.         return;
    9.     }
    10.     boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
    11.         .eq(SeckillVoucher::getVoucherId, voucherId)
    12.         .gt(SeckillVoucher::getStock, 0)
    13.         .setSql("stock = stock -1"));
    14.     if (!flag) {
    15.         throw new RuntimeException("秒杀券扣减失败");
    16.     }
    17.     flag = this.save(voucherOrder);
    18.     if (!flag) {
    19.         throw new RuntimeException("创建秒杀券订单失败");
    20.     }
    21. }
    复制代码
    4 秒杀流程(文字版)


    1. 初始化准备

    在系统启动阶段,我们会完成一些必要的初始化工作。一方面,编写好用于判断库存和订单情况的 Lua 脚本。这个脚本会接收优惠券 ID 和用户 ID 作为参数,通过 Redis 的相关命令判断库存是否充足以及用户是否已下单,保证这些判断操作的原子性。另一方面,在 Java 代码里利用
    1. @PostConstruct
    复制代码
    注解,通过执行另一个 Lua 脚本来创建 Redis 的 Stream 消息队列和消费者组,为后续处理订单消息做好准备。

    2. 用户请求与资格判断

    当用户发起秒杀请求后,系统会立即执行之前准备好的 Lua 脚本来判断用户是否具有秒杀资格。

    • 如果脚本返回库存不足的标识,系统会迅速返回 “库存不足” 的提示信息,结束本次请求处理。
    • 若返回用户已下单的标识,就会返回 “不能重复下单” 的提示,流程终止。
    • 当判定用户具有秒杀资格时,系统会生成唯一的订单 ID,创建订单对象,然后将订单信息发送到 Redis 的 Stream 消息队列,进入异步处理阶段。

    3. 消息队列消费

    有一个专门的消息队列消费者线程会持续监听 Redis 的 Stream 消息队列。

    • 如果没有获取到新的订单信息,线程会继续保持监听状态。
    • 一旦获取到订单信息,线程会马上尝试获取 Redisson 分布式锁。这个锁非常关键,它能确保同一用户同一时间只能处理一个订单,有效避免一人多单的问题。

    4. 订单创建与处理

    获取到锁之后,系统会进一步处理订单。

    • 首先判断当前用户是否是第一单。如果不是,系统会记录错误日志并释放锁,结束流程。
    • 若是第一单,系统会尝试扣减库存。如果库存扣减失败,会抛出异常并释放锁;若扣减成功,就将订单信息保存到数据库。
    • 在保存订单时,若保存失败会抛出异常并释放锁;保存成功后,系统会向 Redis 的 Stream 消息队列发送 ACK 确认消息,最后释放锁,完成整个秒杀流程。
    到此这篇关于Redis消息队列实现异步秒杀的文章就介绍到这了,更多相关Redis异步秒杀内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    来源:https://www.jb51.net/database/3401497d3.htm
    免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

    本帖子中包含更多资源

    您需要 登录 才可以下载或查看,没有账号?立即注册

    ×

    最新评论

    浏览过的版块

    QQ Archiver 手机版 小黑屋 福建二哥 ( 闽ICP备2022004717号|闽公网安备35052402000345号 )

    Powered by Discuz! X3.5 © 2001-2023

    快速回复 返回顶部 返回列表