引言
在实时数据处理系统中,我们经常需要统计某个事件在特定时间窗口内的发生次数,例如:
- 统计用户每小时访问次数
- 限制设备每分钟请求频率
- 广告曝光按小时去重计数
这类需求通常面临两个核心挑战:
- 高并发计数:多台服务器同时读写同一个计数器
- 精确时间窗口:数据到点自动过期,避免累积
本文将详细介绍如何基于 Redis 实现高性能、高可用的计数方案,并提供完整的Java代码实现。
一、Redis计数方案选型
1.1 为什么选择Redis
方案QPS数据一致性实现复杂度数据库+事务~1K强一致高本地缓存~100K最终一致中Redis原子操作50K+强一致低Redis的单线程模型天然适合计数场景,提供INCR/INCRBY等原子命令。
1.2 Key设计原则
- // 格式:业务前缀:appId:deviceId:ip:时间窗口
- String key = "flow:count:app123:device456:127.0.0.1:2023080117";
复制代码
- 包含所有维度信息
- 时间窗口按小时切分(可调整)
- 添加业务前缀避免冲突
二、基础实现方案
2.1 简单INCRBY实现
- public void incrementCount(String key, int delta) {
- redisTemplate.opsForValue().increment(key, delta);
- }
复制代码 问题:没有过期时间,会导致数据无限堆积
2.2 增加过期时间
- public void incrementWithExpire(String key, int delta, long ttlSeconds) {
- redisTemplate.opsForValue().increment(key, delta);
- redisTemplate.expire(key, ttlSeconds, TimeUnit.SECONDS);
- }
复制代码 新问题:每次操作都设置TTL,造成冗余Redis调用
三、优化方案:精准TTL控制
3.1 判断Key是否首次写入
我们需要确保TTL只在Key创建时设置一次,两种实现方式:
方案A:Lua脚本(推荐)- private static final String LUA_SCRIPT =
- "local current = redis.call('INCRBY', KEYS[1], ARGV[1])\n" +
- "if current == tonumber(ARGV[1]) then\n" +
- " redis.call('EXPIRE', KEYS[1], ARGV[2])\n" +
- "end\n" +
- "return current";
- public Long incrementAtomically(String key, int delta, long ttl) {
- return redisTemplate.execute(
- new DefaultRedisScript<>(LUA_SCRIPT, Long.class),
- Collections.singletonList(key),
- String.valueOf(delta), String.valueOf(ttl)
- );
- }
复制代码 优势:
方案B:SETNX+INCRBY- public void incrementWithNX(String key, int delta, long ttl) {
- redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
- StringRedisConnection conn = (StringRedisConnection) connection;
- conn.setNX(key, "0"); // 尝试初始化
- conn.incrBy(key, delta);
- if (conn.setNX(key + ":lock", "1")) { // 简易锁判断首次
- conn.expire(key, ttl);
- conn.expire(key + ":lock", 10);
- }
- return null;
- });
- }
复制代码 适用场景:Redis版本<2.6(不支持Lua)
四、完整生产级实现
4.1 时间窗口计算
- public long calculateTtlToNextHour() {
- LocalDateTime now = LocalDateTime.now();
- LocalDateTime nextHour = now.plusHours(1).truncatedTo(ChronoUnit.HOURS);
- return ChronoUnit.SECONDS.between(now, nextHour);
- }
复制代码 4.2 Kafka消费者集成
- @Component
- @RequiredArgsConstructor
- public class FlowCounter {
- private final RedisTemplate<String, String> redisTemplate;
- private static final String KEY_PREFIX = "flow:count:";
- @KafkaListener(topics = "${kafka.topic}")
- public void handleMessages(List<Message> messages) {
- Map<String, Integer> countMap = messages.stream()
- .collect(Collectors.toMap(
- this::buildKey,
- msg -> 1,
- Integer::sum
- ));
-
- countMap.forEach((k, v) ->
- incrementAtomically(k, v, calculateTtlToNextHour())
- );
- }
- private String buildKey(Message msg) {
- return String.format("%s%s:%s:%s:%s",
- KEY_PREFIX,
- msg.getAppId(),
- msg.getDeviceId(),
- msg.getIp(),
- LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"))
- );
- }
- }
复制代码 4.3 查询接口
- public long getCurrentCount(String appId, String deviceId, String ip) {
- String key = buildKey(appId, deviceId, ip);
- String val = redisTemplate.opsForValue().get(key);
- return val != null ? Long.parseLong(val) : 0L;
- }
复制代码 五、性能优化技巧
5.1 Pipeline批量处理
- redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
- StringRedisConnection conn = (StringRedisConnection) connection;
- countMap.forEach((k, v) -> {
- conn.incrBy(k, v);
- // 可结合Lua脚本进一步优化
- });
- return null;
- });
复制代码 5.2 本地预聚合
- // 在内存中先合并相同Key的计数
- Map<String, Integer> localCount = messages.stream()
- .collect(Collectors.toMap(
- this::buildKey,
- m -> 1,
- Integer::sum
- ));
复制代码 5.3 集群部署注意事项
使用{}强制哈希标签,保证相同Key路由到同一节点- "{flow}:count:app123:..."
复制代码 考虑分片策略避免热点
六、异常处理与监控
6.1 Redis重试机制
- @Retryable(maxAttempts = 3, backoff = @Backoff(delay = 100))
- public void safeIncrement(String key, int delta) {
- // 业务逻辑
- }
复制代码 6.2 监控指标
- # TYPE redis_operations_total counter
- redis_operations_total{operation="incr"} 12345
- redis_operations_total{operation="expire"} 678
复制代码 6.3 数据补偿
- @Scheduled(fixedRate = 3600000)
- public void checkDataConsistency() {
- // 对比DB与Redis计数差异
- }
复制代码 七、方案对比总结
方案优点缺点适用场景Lua脚本原子性强,性能最佳需要Redis 2.6+新项目首选SETNX+INCR兼容旧版有竞态风险遗留系统纯INCR+TTL实现简单TTL冗余不推荐生产
结语
通过本文的方案,我们实现了:
- 单机50K+ QPS的计数能力
- 精确到小时的时间窗口控制
- 分布式环境下的强一致性
最佳实践建议:
- 生产环境优先选择Lua脚本方案
- 对于超高并发场景(如双11),可增加本地缓存层
- 定期检查Redis内存使用情况
以上就是高并发下Redis精确计数与时间窗口过期的方法详解的详细内容,更多关于Redis高并发精确计数的资料请关注脚本之家其它相关文章!
来源:https://www.jb51.net/database/338493l9a.htm
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |
|