• 设为首页
  • 收藏本站
  • 积分充值
  • VIP赞助
  • 手机版
  • 微博
  • 微信
    微信公众号 添加方式:
    1:搜索微信号(888888
    2:扫描左侧二维码
  • 快捷导航
    福建二哥 门户 查看主题

    Redis如何使用Pipeline实现批处理操作

    发布者: 雪落无声 | 发布时间: 2025-6-19 12:44| 查看数: 94| 评论数: 0|帖子模式

    在正常情况下,我们每次发送 Redis 命令时,客户端会等待 Redis 服务器的响应,直到接收到结果后,才会发送下一个命令。这种方式虽然保证了操作的顺序性,但在执行大量命令时会产生很大的网络延迟。
    通过 Pipeline 技术,我们的客户端可以将多个命令同时发送给 Redis 服务器,并且不需要等待每个命令的返回结果,直到所有命令都被执行完毕,客户端再一起获取返回值。这样能减少每个命令的等待时间,大幅提高执行效率。
    Redis Pipeline 是一种优化 Redis 操作的机制,通过将多个命令打包发送到 Redis 服务器,减少客户端与服务器之间的网络往返时间(RTT),从而显著提升性能。
    在默认情况下,Redis 客户端与服务器之间的通信是请求-响应模式,即:
    1客户端发送一个命令到服务器。
    2.服务器执行命令并返回结果。
    3.客户端等待响应后再发送下一个命令。
    这种模式在命令数量较少时没有问题,但在需要执行大量命令时,网络往返时间(RTT)会成为性能瓶颈。所以我们需要实现下面目的:
    1.将多个命令打包发送到服务器。
    2.服务器依次执行这些命令,并将结果一次性返回给客户端。
    3.减少网络开销,提升性能。
    以下是一个简单的 Java 示例,展示了如何使用 Jedis(Redis 的一个 Java 客户端)执行 Pipeline:
    注意:批处理时不建议一次携带太多命令,并且Pipeline的多个命令之间不具备原子性。
    1. // 创建 Jedis 实例
    2. Jedis jedis = new Jedis("localhost", 6379);

    3. // 使用 pipelining 方式批量执行命令
    4. Pipeline pipeline = jedis.pipelined();

    5. // 批量操作:使用 pipeline 来缓存命令
    6. for (int i = 0; i < 1000; i++) {
    7.     pipeline.set("key" + i, "value" + i);
    8. }

    9. // 同步执行所有命令
    10. pipeline.sync();
    复制代码
    pipelined() 方法: 创建一个 Pipeline 对象,它缓存所有要执行的命令。
    批量设置命令: 通过 pipeline.set() 将多个 SET 命令放入管道中,但命令并不会立即执行。
    sync() 方法: 通过调用 sync() 方法,客户端将会把所有缓存的命令一次性发送给 Redis,并等待它们完成执行。
    但是这些都是在单机模式下的批处理,那对于集群来说该如何使用呢?
    向MSet或Pipeline这样的批处理需要在一次请求中携带多条命令,而此时如何Redis是一个集群,那批处理命令的多个key必须落在同一个插槽中,否则就会导致执行失败。

    一般推荐使用并行插槽来解决,如果使用hash_tag,可能会出现大量的key分配同一插槽导致数据倾斜,而并行插槽不会。
    那么这里我们模拟一下并行插槽实现:
    将多个键值对按照Redis集群的槽位进行分组,然后分别使用jedisCluster.mset()方法按组设置键值对。
    1. public class JedisClusterTest {

    2.     // 声明一个JedisCluster对象,用于与Redis集群进行交互
    3.     private JedisCluster jedisCluster;

    4.     // 在每个测试方法执行之前,初始化JedisCluster连接
    5.     @BeforeEach
    6.     void setUp() {
    7.         // 配置Jedis连接池
    8.         JedisPoolConfig poolConfig = new JedisPoolConfig();
    9.         poolConfig.setMaxTotal(8);
    10.         poolConfig.setMaxIdle(8);
    11.         poolConfig.setMinIdle(0);
    12.         poolConfig.setMaxWaitMillis(1000);

    13.         // 创建一个HashSet,用于存储Redis集群的节点信息
    14.         HashSet<HostAndPort> nodes = new HashSet<>();
    15.         // 添加Redis集群的节点信息(IP和端口)
    16.         nodes.add(new HostAndPort("192.168.150.101", 7001));
    17.         nodes.add(new HostAndPort("192.168.150.101", 7002));
    18.         nodes.add(new HostAndPort("192.168.150.101", 7003));
    19.         nodes.add(new HostAndPort("192.168.150.101", 8001));
    20.         nodes.add(new HostAndPort("192.168.150.101", 8002));
    21.         nodes.add(new HostAndPort("192.168.150.101", 8003));

    22.         // 使用配置的连接池和节点信息初始化JedisCluster对象
    23.         jedisCluster = new JedisCluster(nodes, poolConfig);
    24.     }

    25.     // 测试方法:使用mset命令一次性设置多个键值对
    26.     @Test
    27.     void testMSet() {
    28.         // 使用JedisCluster的mset方法,一次性设置多个键值对
    29.         // 但是jedisCluster默认是无法解决批处理问题的,需要我们手动解决
    30.         jedisCluster.mset("name", "Jack", "age", "21", "sex", "male");
    31.     }

    32.     // 测试方法:使用mset命令按槽位分组设置多个键值对
    33.     @Test
    34.     void testMSet2() {
    35.         // 创建一个HashMap,用于存储多个键值对
    36.         Map<String, String> map = new HashMap<>(3);
    37.         map.put("name", "Jack");
    38.         map.put("age", "21");
    39.         map.put("sex", "Male");

    40.         // 将map中的键值对按照Redis集群的槽位进行分组
    41.         Map<Integer, List<Map.Entry<String, String>>> result = map.entrySet()
    42.                 .stream()
    43.                 .collect(Collectors.groupingBy(
    44.                         // 使用ClusterSlotHashUtil计算每个键对应的槽位
    45.                         entry -> ClusterSlotHashUtil.calculateSlot(entry.getKey()))
    46.                 );

    47.         // 遍历按哈希槽分组后的结果
    48.         for (List<Map.Entry<String, String>> list : result.values()) {
    49.             // 创建一个数组用于批量设置Redis的键值对
    50.             String[] arr = new String[list.size() * 2];  // 每个键值对包含两个元素
    51.             int j = 0;  // 索引变量,用于在数组中定位位置
    52.             for (int i = 0; i < list.size(); i++) {
    53.                 j = i << 1;  // 通过位移计算数组中的位置
    54.                 Map.Entry<String, String> e = list.get(i);  // 获取当前的键值对
    55.                 arr[j] = e.getKey();  // 将键放入数组中
    56.                 arr[j + 1] = e.getValue();  // 将值放入数组中
    57.             }
    58.             // 批量设置Redis集群中的键值对
    59.             jedisCluster.mset(arr);
    60.         }
    61.     }

    62.     // 在每个测试方法执行之后,关闭JedisCluster连接
    63.     @AfterEach
    64.     void tearDown() {
    65.         // 如果JedisCluster对象不为空,则关闭连接
    66.         if (jedisCluster != null) {
    67.             jedisCluster.close();
    68.         }
    69.     }
    70. }
    复制代码
    而在Redis集群环境下,如果需要批量获取多个键的值,可以使用multiGet方法。multiGet是RedisTemplate提供的一个方法,用于一次性获取多个键的值。然而,需要注意的是,multiGet在集群环境下要求所有键必须位于同一个槽位(slot),否则会抛出异常。
    1. @Service
    2. public class RedisService {

    3.     @Autowired
    4.     private RedisTemplate<String, Object> redisTemplate;

    5.     /**
    6.      * 跨槽位批量获取多个键的值
    7.      */
    8.     public Map<String, Object> batchGetCrossSlot(List<String> keys) {
    9.         // 按槽位分组
    10.         Map<Integer, List<String>> slotKeyMap = keys.stream()
    11.                 .collect(Collectors.groupingBy(ClusterSlotHashUtil::calculateSlot));

    12.         // 存储最终结果
    13.         Map<String, Object> result = new HashMap<>();

    14.         // 对每个槽位的键分别调用multiGet
    15.         for (Map.Entry<Integer, List<String>> entry : slotKeyMap.entrySet()) {
    16.             List<String> slotKeys = entry.getValue();
    17.             List<Object> slotValues = redisTemplate.opsForValue().multiGet(slotKeys);

    18.             // 将结果存入Map
    19.             for (int i = 0; i < slotKeys.size(); i++) {
    20.                 result.put(slotKeys.get(i), slotValues.get(i));
    21.             }
    22.         }

    23.         return result;
    24.     }

    25.     /**
    26.      * 测试跨槽位批量获取方法
    27.      */
    28.     public void testBatchGetCrossSlot() {
    29.         List<String> keys = Arrays.asList("name", "age", "sex");
    30.         Map<String, Object> values = batchGetCrossSlot(keys);

    31.         // 打印结果
    32.         values.forEach((key, value) -> {
    33.             System.out.println("Key: " + key + ", Value: " + value);
    34.         });
    35.     }
    36. }
    复制代码
    到此这篇关于Redis如何使用Pipeline实现批处理操作的文章就介绍到这了,更多相关Redis Pipeline批处理操作内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    来源:https://www.jb51.net/database/335932pl6.htm
    免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

    本帖子中包含更多资源

    您需要 登录 才可以下载或查看,没有账号?立即注册

    ×

    最新评论

    浏览过的版块

    QQ Archiver 手机版 小黑屋 福建二哥 ( 闽ICP备2022004717号|闽公网安备35052402000345号 )

    Powered by Discuz! X3.5 © 2001-2023

    快速回复 返回顶部 返回列表