• 设为首页
  • 收藏本站
  • 积分充值
  • VIP赞助
  • 手机版
  • 微博
  • 微信
    微信公众号 添加方式:
    1:搜索微信号(888888
    2:扫描左侧二维码
  • 快捷导航
    福建二哥 门户 查看主题

    Redis中管道操作pipeline的实现

    发布者: 山止川行 | 发布时间: 2025-6-19 12:39| 查看数: 118| 评论数: 0|帖子模式

    什么是pipeline

    在 Redis 中,Pipeline(管道)是一种客户端与服务器间通信的优化机制,旨在减少网络往返时间和提高命令执行效率。以下是 Redis Pipeline 的具体定义和特点:
    1.批量发送与接收:

    • 使用 Pipeline 时,客户端不再逐条发送命令,而是将多个命令一次性打包成一个请求包发送给 Redis 服务器。相应地,服务器在接收到这个请求包后,不是立即返回每条命令的执行结果,而是先将所有命令依次执行完毕,然后将所有结果打包成一个响应包返回给客户端。
    • 这种做法显著减少了客户端与服务器之间网络通信的次数,尤其是对于需要执行大量命令的场景,能够极大地降低网络延迟带来的影响。
    2.异步执行

    • 尽管 Redis Pipeline 中的所有命令是在服务器端按顺序执行的,但由于客户端与服务器之间的通信是批量进行的,客户端可以在发送完一批命令后立刻开始处理其他任务,而无需等待每个命令的单独响应。这种异步处理方式可以更好地利用客户端的计算资源,提高整体应用程序的并发性能。
    3.命令隔离

    • 在 Pipeline 中,每个命令的执行互不影响,即一个命令的执行结果不会影响后续命令的执行。这意味着即使某条命令执行失败,也不会阻止后续命令的执行。客户端在解析响应包时,可以根据响应内容判断每条命令的执行结果。
    4.使用场景

    • Pipeline 主要适用于需要对 Redis 执行大量命令的操作,如数据批量导入、大规模数据更新、复杂查询等。这些操作若不使用 Pipeline,可能会因为网络延迟导致整体执行时间显著增加。
    • 对于涉及事务(transaction)的操作,虽然也可以使用 Pipeline 来打包命令,但需要注意的是,Pipeline 不提供事务的原子性和一致性保证。如果需要确保一组命令作为一个原子单位执行,应使用 Redis 的 MULTI/EXEC 命令来开启事务。
    5.注意事项

    • 虽然 Pipeline 能够显著提高命令执行效率,但一次性发送的命令数量不宜过大,否则可能导致数据包过大,增加网络传输压力,甚至超过 Redis 服务器或客户端的缓冲区限制,引发错误。合理的命令打包大小需要根据实际环境和网络状况进行调整
    • 在使用 Pipeline 时,由于命令的响应是延迟返回的,客户端需要做好错误处理和重试策略,尤其是在网络不稳定或服务器负载较高的情况下。
    1. 总结来说,Redis Pipeline 是一种客户端与服务器间高效通信的技术,通过批量发送和接收命令,减少网络往返次数,提高命令执行效率,尤其适用于大量命令操作的场景。在使用时需注意命令打包大小的控制以及错误处理。
    复制代码
    场景一:我要向redis新增大批量的数据

    Redis Pipeline允许一次性发送多个命令到Redis服务器,而无需等待每个命令的响应,显著减少了网络往返时间和潜在的延迟。在Spring Boot应用中,可以使用RedisTemplate的executePipelined()方法实现:
    1. @Autowired
    2. private StringRedisTemplate redisTemplate

    3. public void batchInsertUsersWithPipeline(List<User> users, String keyPrefix, long ttlSeconds) {
    4.     redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
    5.         for (User user : users) {
    6.             String key = generateKey(keyPrefix, user.getId());
    7.             String value = objectMapper.writeValueAsString(user);

    8.             connection.setEx(key.getBytes(), (int) ttlSeconds, value.getBytes());
    9.         }
    10.         return null;
    11.     });
    12. }   
    复制代码
    分批处理

    尽管Pipeline提高了效率,但对于千万级数据,一次性发送所有命令可能导致内存溢出或网络阻塞。因此,建议将数据分批处理,每批包含适量的记录(如1000条),逐批发送至Redis:
    1. public void insertUsersInBatches(List<User> users, String keyPrefix, long ttlSeconds, int batchSize) {
    2.     int start = 0;
    3.     while (start < users.size()) {
    4.         int end = Math.min(start + batchSize, users.size());
    5.         List<User> batch = users.subList(start, end);
    6.         batchInsertUsersWithPipeline(batch, keyPrefix, ttlSeconds);
    7.         start = end;
    8.     }
    9. }
    复制代码
    batchInsertUsersWithPipeline方法利用Redis Pipeline机制发送批量命令,可以在一定程度上提高插入操作的并发性,减少网络往返时间和整体耗时。然而,Pipeline本身并不能严格保证所有命令同时成功或失败,其主要特性如下:
    1.原子性:

    • Redis命令在Pipeline内部是原子性的,即单个命令的执行不会被其他命令中断。
    • 注意:这并不意味着整个Pipeline的所有命令作为一个整体具有原子性。Pipeline中的命令仍然是依次执行的,只是客户端与服务器之间的通信过程被优化了。
    2.响应顺序

    • Redis服务器会按接收到命令的顺序返回结果。即使在Pipeline中并发发送多个命令,客户端接收到的响应也将按照命令发送的顺序排列。
    3.故障处理

    • 如果Pipeline中的某个命令执行失败(如语法错误、key不存在等),后续命令通常仍会继续执行。
    • 错误信息会包含在相应命令的响应中,客户端可以根据这些信息判断哪些命令执行成功,哪些失败。
    综上所述,batchInsertUsersWithPipeline方法不能严格保证所有命令同时成功或失败。在实际使用中,如果需要确保一批数据要么全部成功插入,要么全部失败回滚,可以采取以下策略:

    事务( MULTI/EXEC/DISCARD ):


    • redis提供了事务(Transaction)功能,通过MULTI、EXEC和DISCARD等命令,可以将一组命令打包在一起执行,只有当所有命令都能成功执行时,整个事务才会提交;否则,任何命令失败都将导致整个事务回滚。
    • 尽管Redis事务不支持回滚到某一特定状态(即不保证隔离性),但在批量插入场景下,它可以满足“全有或全无”的要求。
    Lua脚本:

    • 使用Lua脚本编写批量插入逻辑,脚本在Redis服务器端执行,具备原子性。即使在网络中断或服务器重启等异常情况下,脚本要么完全执行,要么完全不执行,不会出现部分成功部分失败的情况。

    batchInsertUsersWithPipeline方法中的connection中各个方法的区别是什么?

    1.connection.setEx(key.getBytes(), (int) ttlSeconds, value.getBytes());
    这一行调用了RedisConnection的setEx方法,用于设置一个带有过期时间(Time To Live,TTL)的键值对。参数说明如下:

    • key.getBytes(): 将给定的键(字符串)转换为字节数组,这是Redis底层通信协议所要求的格式。
    • (int) ttlSeconds: 将过期时间(以秒为单位)转换为整数类型,表示键值对在指定秒数后自动过期并被删除。
    • value.getBytes(): 同样将给定的值(用户对象序列化后的JSON字符串)转换为字节数组
    setEx方法确保在设置键值对的同时为其设定一个过期时间。如果键已经存在,该方法会更新键的值和过期时间。这个操作在Pipeline模式下是原子的,即在同一时刻只有一个setEx命令被执行。
    2.connection.multi(); 和 connection.exec();
    这两个方法涉及Redis的事务(Transaction)功能。在Pipeline模式下,由于我们希望保持较高的性能,一般不会使用这两个方法。但如果确实需要保证一批命令的原子性,可以使用如下方式:

    • connection.multi(): 开启一个事务块,后续的所有命令都会被放入这个事务中,直到调用exec方法。在Pipeline模式下,调用multi方法可能会破坏原有的性能优化效果。
    • connection.exec(): 提交并执行事务中的所有命令。如果事务中有任何一个命令执行失败,其他命令也会被取消执行,整个事务被视为失败。
    在您的batchInsertUsersWithPipeline方法中并没有使用multi和exec,因为Pipeline已经提供了高效的批量执行机制,而且这里的目的是提高插入性能,而不是实现严格的事务行为。
    综上所述,batchInsertUsersWithPipeline方法中直接使用了setEx方法,利用Pipeline来高效地批量插入带有过期时间的键值对。如果需要实现更严格的事务控制,应考虑使用Redis的事务(MULTI/EXEC)或Lua脚本,但这通常会牺牲一定的性能,并且与Pipeline机制不完全兼容。在实际应用场景中,应根据业务需求权衡选择合适的操作方式。
    3.connection.set()和connection.setNx有什么区别
    connection.set() 和 connection.setNx() 都是Redis的键值对设置方法,它们的主要区别在于是否存在条件以及对已有键的处理方式:
    1.connection.set(key, value)
    这是最基础的设置键值对的方法,无论键是否存在,都会直接覆盖(或创建)对应的键值对。参数说明如下:

    • key: 要设置的键。
    • value: 要关联的值。
    行为特点:

    • 无条件设置:不论键是否存在,都会执行设置操作。
    • 覆盖已有键:如果键已存在,其原有值会被新的值覆盖。
    • 创建新键:如果键不存在,会创建一个新的键值对。
    2.connection.setNx(key, value)
    这是带有条件的设置键值对方法,仅当键不存在时才会设置键值对。参数与set()相同:

    • key: 要设置的键
    • value: 要关联的值。
    行为特点

    • 有条件设置:仅在键不存在的情况下执行设置操作。
    • 不覆盖已有键:如果键已存在,该方法不会有任何动作,既不会改变键的值,也不会抛出错误。
    • 创建新键:如果键不存在,会创建一个新的键值对。
    1. 总结来说,connection.set()无条件地设置或更新键值对,而connection.setNx()则是在键不存在时才设置键值对,如果键已存在,则不会执行任何操作。前者适用于常规的键值更新或插入,后者常用于实现锁机制、唯一性检查等场景,确保某个键的值只在首次设置时有效。在您的batchInsertUsersWithPipeline方法中,由于目标是批量插入新数据,所以使用了setEx方法(带有过期时间的set),确保每个用户数据作为一个新的键值对被添加到Redis中。如果您需要在插入前检查键的唯一性,可以考虑使用setNx方法。不过,对于批量插入场景,通常假设数据是新的且键不存在,因此直接使用setEx更为常见。
    复制代码
    场景二:大批量删除redis中的数据
    1. public void batchDeleteKeysWithPipeline(List<String> keys) {
    2.     redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
    3.         for (String key : keys) {
    4.             connection.del(key.getBytes());
    5.         }
    6.         return null;
    7.     });
    8. }
    复制代码

    • redisTemplate.executePipelined() 方法创建了一个Pipeline上下文,允许您在回调函数内发送多个命令而不等待响应。
    • 回调函数遍历要删除的键列表,对每个键调用 connection.del(key.getBytes())。del 方法用于删除指定键,将键名转换为字节数组后传递给Redis。
    • 所有del命令在Pipeline中被连续发送至Redis服务器,期间客户端不会等待任何响应。
    • 当回调函数执行完毕并返回时,Pipeline中的命令会被一次性发送至Redis,并接收所有命令的响应。由于命令是在一次网络往返中批量发送的,因此比单独执行每个删除命令效率更高。

    场景三:删除redis中千万级别的数据

    1.批量删除策略

    • 使用 SCAN 命令结合 DEL 命令实现批量删除。SCAN 命令用于增量式地迭代数据集,避免一次性获取所有键导致内存溢出。
      DEL 命令用于删除单个或多个键。
    2.并行处理

    • 利用多线程或异步任务将批量删除操作分散到多个工作线程中,提高删除效率。
    3.Redis 客户端优化:

    • 选择高性能、支持批量操作和管道(Pipeline)功能的 Redis 客户端库,如 Jedis 或 Lettuce。
    4.监控与故障恢复:

    • 在执行大规模删除操作时,密切关注 Redis 的性能指标(如 CPU、内存、网络带宽等)以及客户端程序的状态。
    • 准备应对可能的异常情况,如断连重试、数据一致性检查等。

    基于Jedis客户端实现
    1. import redis.clients.jedis.Jedis;
    2. import redis.clients.jedis.ScanParams;
    3. import redis.clients.jedis.ScanResult;

    4. public class RedisDataDeleter {

    5.     private static final int SCAN_BATCH_SIZE = 1000; // 可根据实际情况调整
    6.     private static final String MATCH_PATTERN = "*"; // 匹配所有键

    7.     public void deleteAllKeys(Jedis jedis) {
    8.         ScanParams scanParams = new ScanParams().count(SCAN_BATCH_SIZE).match(MATCH_PATTERN);

    9.         String cursor = "0";
    10.         while (true) {
    11.             ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
    12.             cursor = scanResult.getCursor();

    13.             List<String> keysToDelete = scanResult.getResult();
    14.             if (!keysToDelete.isEmpty()) {
    15.                 // 使用 Pipeline 批量删除键
    16.                 Pipeline pipeline = jedis.pipelined();
    17.                 for (String key : keysToDelete) {
    18.                     pipeline.del(key);
    19.                 }
    20.                 pipeline.sync(); // 执行批量命令
    21.             }

    22.             if ("0".equals(cursor)) {
    23.                 break; // 扫描完成
    24.             }
    25.         }
    26.     }
    27. }
    复制代码
    注意

    • 请确保在生产环境中适当调整 SCAN_BATCH_SIZE 参数,使其既能充分利用系统资源,又不会对 Redis 服务器造成过大压力。
    • 在执行大规模删除操作前,最好先备份重要数据,并在非高峰期进行操作,以减少对业务的影响。
    1. 如果条件允许,建议升级到 Redis 6.x 版本,并启用 activedefrag 配置项,有助于在删除大量数据后及时进行碎片整理,保持 Redis 内存的高效利用。同时,监控 Redis 的内存使用情况和碎片率,必要时手动触发 BGREWRITEAOF 或 BGSAVE 操作。
    复制代码
    maven
    1. <dependencies>
    2.     <!-- ... 其他依赖 ... -->
    3.     <dependency>
    4.         <groupId>redis.clients</groupId>
    5.         <artifactId>jedis</artifactId>
    6.         <version>3.7.0</version> <!-- 根据实际版本号调整 -->
    7.     </dependency>
    8. </dependencies>
    复制代码
    jedis连接池配置
    1. spring.redis.host=192.168.1.100
    2. spring.redis.port=6379
    3. spring.redis.password=mysecretpassword  # 如果有密码,请填写

    4. # Jedis 连接池配置
    5. spring.redis.jedis.pool.max-active=10
    6. spring.redis.jedis.pool.max-idle=6
    7. spring.redis.jedis.pool.min-idle=2
    8. spring.redis.jedis.pool.max-wait=2000ms
    复制代码
    jedisConfig
    1. @Configuration
    2. public class JedisConfig {

    3.     @Value("${spring.redis.host}")
    4.     private String host;

    5.     @Value("${spring.redis.port}")
    6.     private int port;

    7.     @Value("${spring.redis.password}")
    8.     private String password;

    9.     @Bean
    10.     public JedisPool jedisPool() {
    11.         JedisPoolConfig poolConfig = new JedisPoolConfig();
    12.         poolConfig.setMaxTotal(Integer.parseInt(env.getProperty("spring.redis.jedis.pool.max-active")));
    13.         poolConfig.setMaxIdle(Integer.parseInt(env.getProperty("spring.redis.jedis.pool.max-idle")));
    14.         poolConfig.setMinIdle(Integer.parseInt(env.getProperty("spring.redis.jedis.pool.min-idle")));
    15.         poolConfig.setMaxWaitMillis(Long.parseLong(env.getProperty("spring.redis.jedis.pool.max-wait")));

    16.         return new JedisPool(poolConfig, host, port, Protocol.DEFAULT_TIMEOUT, password);
    17.     }
    18. }
    复制代码
    实现 Redis 数据删除服务
    1. @Service
    2. public class RedisDataDeleterService {

    3.     @Autowired
    4.     private JedisPool jedisPool;

    5.     public void deleteAllKeys() {
    6.         try (Jedis jedis = jedisPool.getResource()) {
    7.             ScanParams scanParams = new ScanParams().match("*").count(1000);

    8.             String cursor = "0";
    9.             while (true) {
    10.                 ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
    11.                 cursor = scanResult.getCursor();

    12.                 List<String> keysToDelete = scanResult.getResult();
    13.                 if (!keysToDelete.isEmpty()) {
    14.                     Pipeline pipeline = jedis.pipelined();
    15.                     for (String key : keysToDelete) {
    16.                         pipeline.del(key);
    17.                     }
    18.                     pipeline.sync();
    19.                 }

    20.                 if ("0".equals(cursor)) {
    21.                     break;
    22.                 }
    23.             }
    24.         }
    25.     }
    26. }
    复制代码
    调用删除服务
    1. @RestController
    2. @RequestMapping("/redis")
    3. public class RedisController {

    4.     @Autowired
    5.     private RedisDataDeleterService redisDataDeleterService;

    6.     @GetMapping("/delete-all-keys")
    7.     public ResponseEntity<?> deleteAllKeys() {
    8.         redisDataDeleterService.deleteAllKeys();
    9.         return ResponseEntity.ok().build();
    10.     }
    11. }
    复制代码
    基于Lettuce

    maven
    1. <dependencies>
    2.     <!-- ... 其他依赖 ... -->
    3.     <dependency>
    4.         <groupId>io.lettuce</groupId>
    5.         <artifactId>lettuce-core</artifactId>
    6.         <version>6.2.¼</version> <!-- 根据实际版本号调整 -->
    7.     </dependency>
    8. </dependencies>
    复制代码
    配置Lettuce
    Spring Boot 自动配置会为 Lettuce 提供连接池支持。在 application.properties 或 application.yml 中配置 Redis 连接信息:
    1. spring.redis.host=192.168.1.100
    2. spring.redis.port=6379
    3. spring.redis.password=mysecretpassword  # 如果有密码,请填写
    复制代码
    使用 Lettuce 客户端执行批量删除操作:
    1. @Service
    2. public class RedisDataDeleterService {

    3.     @Autowired
    4.     private RedisConnectionFactory connectionFactory;

    5.     public void deleteAllKeys() {
    6.         RedisAsyncCommands<String, String> asyncCommands = connectionFactory.getConnection().async();

    7.         ScanArgs scanArgs = ScanArgs.Builder.matches("*").count(1000);
    8.         RedisFuture<ScanResult<String>> scanFuture = asyncCommands.scan(ScanCursor.INITIAL, scanArgs);

    9.         AtomicBoolean isRunning = new AtomicBoolean(true);
    10.         AtomicReference<ScanCursor> lastCursor = new AtomicReference<>(ScanCursor.INITIAL);

    11.         // 异步处理扫描结果
    12.         scanFuture.thenAccept(scanResult -> {
    13.             lastCursor.set(scanResult.getCursor());
    14.             List<String> keysToDelete = scanResult.getKeys();
    15.             if (!keysToDelete.isEmpty()) {
    16.                 RedisFuture<Long> delFuture = asyncCommands.del(keysToDelete.toArray(new String[0]));
    17.                 delFuture.thenAccept(count -> {
    18.                     if (isRunning.get()) {
    19.                         // 如果仍在运行,继续扫描
    20.                         deleteAllKeysRecursive(asyncCommands, scanArgs, lastCursor, isRunning);
    21.                     }
    22.                 });
    23.             } else {
    24.                 isRunning.set(false);
    25.             }
    26.         });

    27.         // 设置超时时间(可根据实际情况调整)
    28.         CompletableFuture.runAsync(() -> {
    29.             try {
    30.                 Thread.sleep(120000); // 2分钟超时
    31.             } catch (InterruptedException e) {
    32.                 Thread.currentThread().interrupt();
    33.             }
    34.             isRunning.set(false);
    35.         });
    36.     }

    37.     private void deleteAllKeysRecursive(RedisAsyncCommands<String, String> asyncCommands,
    38.                                        ScanArgs scanArgs,
    39.                                        AtomicReference<ScanCursor> lastCursor,
    40.                                        AtomicBoolean isRunning) {
    41.         if (isRunning.get()) {
    42.             asyncCommands.scan(lastCursor.get(), scanArgs).thenAccept(scanResult -> {
    43.                 lastCursor.set(scanResult.getCursor());
    44.                 List<String> keysToDelete = scanResult.getKeys();
    45.                 if (!keysToDelete.isEmpty()) {
    46.                     asyncCommands.del(keysToDelete.toArray(new String[0])).thenAccept(count -> {
    47.                         if (isRunning.get()) {
    48.                             deleteAllKeysRecursive(asyncCommands, scanArgs, lastCursor, isRunning);
    49.                         }
    50.                     });
    51.                 } else {
    52.                     isRunning.set(false);
    53.                 }
    54.             });
    55.         }
    56.     }
    57. }
    复制代码
    调用
    1. @RestController
    2. @RequestMapping("/redis")
    3. public class RedisController {

    4.     @Autowired
    5.     private RedisDataDeleterService redisDataDeleterService;

    6.     @GetMapping("/delete-all-keys")
    7.     public ResponseEntity<?> deleteAllKeys() {
    8.         redisDataDeleterService.deleteAllKeys();
    9.         return ResponseEntity.ok().build();
    10.     }
    11. }
    复制代码
    到此这篇关于Redis中管道操作pipeline的实现的文章就介绍到这了,更多相关Redis管道操作pipeline内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    来源:https://www.jb51.net/database/338296qlx.htm
    免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

    最新评论

    QQ Archiver 手机版 小黑屋 福建二哥 ( 闽ICP备2022004717号|闽公网安备35052402000345号 )

    Powered by Discuz! X3.5 © 2001-2023

    快速回复 返回顶部 返回列表