Redis

Redis 核心知识笔记 - 第四部分:高并发问题与解决方案

Redis 核心知识笔记 - 第四部分:高并发问题与解决方案

一、缓存问题概览


二、缓存穿透

2.1 问题描述

缓存穿透:查询一个一定不存在的数据,由于缓存中没有,每次都会去数据库查询,导致数据库压力过大。

2.2 解决方案

方案一:缓存空值

@Service
public class CachePenetrationService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Autowired
    private UserMapper userMapper;
    
    private static final String EMPTY_VALUE = "NULL";
    private static final long EMPTY_EXPIRE = 300; // 空值缓存5分钟
    
    public User getUserById(Long id) {
        String key = "user:" + id;
        String cacheValue = redisTemplate.opsForValue().get(key);
        
        // 缓存命中
        if (cacheValue != null) {
            // 判断是否是空值占位符
            if (EMPTY_VALUE.equals(cacheValue)) {
                return null;
            }
            return JSON.parseObject(cacheValue, User.class);
        }
        
        // 查询数据库
        User user = userMapper.selectById(id);
        
        if (user != null) {
            // 正常缓存
            redisTemplate.opsForValue().set(key, JSON.toJSONString(user), 
                1, TimeUnit.HOURS);
        } else {
            // 缓存空值,设置较短过期时间
            redisTemplate.opsForValue().set(key, EMPTY_VALUE, 
                EMPTY_EXPIRE, TimeUnit.SECONDS);
        }
        
        return user;
    }
}

方案二:布隆过滤器

@Service
public class BloomFilterService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    private static final String BLOOM_KEY = "user:bloom";
    
    // 初始化布隆过滤器(使用 Redisson)
    @Autowired
    private RedissonClient redissonClient;
    
    private RBloomFilter<Long> bloomFilter;
    
    @PostConstruct
    public void init() {
        bloomFilter = redissonClient.getBloomFilter(BLOOM_KEY);
        // 预计元素数量100万,误判率0.01%
        bloomFilter.tryInit(1000000L, 0.0001);
        
        // 初始化时加载所有存在的ID
        loadAllUserIds();
    }
    
    private void loadAllUserIds() {
        // 从数据库加载所有用户ID
        List<Long> userIds = userMapper.selectAllIds();
        userIds.forEach(id -> bloomFilter.add(id));
    }
    
    public User getUserById(Long id) {
        // 1. 布隆过滤器判断
        if (!bloomFilter.contains(id)) {
            // 一定不存在
            return null;
        }
        
        // 2. 可能存在,查缓存
        String key = "user:" + id;
        String cacheValue = redisTemplate.opsForValue().get(key);
        
        if (cacheValue != null) {
            return JSON.parseObject(cacheValue, User.class);
        }
        
        // 3. 查数据库
        User user = userMapper.selectById(id);
        if (user != null) {
            redisTemplate.opsForValue().set(key, JSON.toJSONString(user), 
                1, TimeUnit.HOURS);
        }
        
        return user;
    }
    
    // 新增用户时添加到布隆过滤器
    public void addUser(User user) {
        userMapper.insert(user);
        bloomFilter.add(user.getId());
    }
}

方案三:接口层校验

@RestController
public class UserController {
    
    @GetMapping("/user/{id}")
    public Result<User> getUser(@PathVariable Long id) {
        // 参数校验:过滤明显非法的请求
        if (id == null || id <= 0) {
            return Result.fail("参数非法");
        }
        
        // ID 范围校验
        if (id > MAX_USER_ID) {
            return Result.fail("用户不存在");
        }
        
        return Result.success(userService.getUserById(id));
    }
}

三、缓存击穿

3.1 问题描述

缓存击穿:某个热点 Key 在过期的瞬间,大量并发请求同时访问,全部打到数据库。

3.2 解决方案

方案一:互斥锁(分布式锁)

@Service
public class CacheBreakdownMutexService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Autowired
    private ProductMapper productMapper;
    
    private static final String LOCK_PREFIX = "lock:";
    
    public Product getProductById(Long id) {
        String key = "product:" + id;
        String cacheValue = redisTemplate.opsForValue().get(key);
        
        // 缓存命中
        if (cacheValue != null) {
            return JSON.parseObject(cacheValue, Product.class);
        }
        
        // 缓存未命中,获取分布式锁
        String lockKey = LOCK_PREFIX + key;
        String lockValue = UUID.randomUUID().toString();
        
        try {
            // 尝试获取锁
            Boolean locked = redisTemplate.opsForValue()
                .setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS);
            
            if (Boolean.TRUE.equals(locked)) {
                // 获取锁成功,查询数据库
                Product product = productMapper.selectById(id);
                
                if (product != null) {
                    // 设置缓存
                    redisTemplate.opsForValue().set(key, 
                        JSON.toJSONString(product), 1, TimeUnit.HOURS);
                }
                return product;
            } else {
                // 获取锁失败,等待后重试
                Thread.sleep(50);
                return getProductById(id);  // 递归重试
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        } finally {
            // 释放锁(使用 Lua 脚本保证原子性)
            String script = 
                "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                "   return redis.call('del', KEYS[1]) " +
                "else " +
                "   return 0 " +
                "end";
            redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),
                Collections.singletonList(lockKey), lockValue);
        }
    }
}

方案二:逻辑过期

@Data
public class CacheData<T> {
    private T data;
    private Long expireTime; // 逻辑过期时间戳
}

@Service
public class LogicalExpireService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Autowired
    private ProductMapper productMapper;
    
    private static final ExecutorService EXECUTOR = 
        Executors.newFixedThreadPool(10);
    
    public Product getProductById(Long id) {
        String key = "product:" + id;
        String cacheValue = redisTemplate.opsForValue().get(key);
        
        // 缓存不存在(冷启动情况)
        if (cacheValue == null) {
            return loadAndCache(id, key);
        }
        
        // 解析缓存数据
        CacheData<Product> cacheData = JSON.parseObject(cacheValue, 
            new TypeReference<CacheData<Product>>() {});
        Product product = cacheData.getData();
        Long expireTime = cacheData.getExpireTime();
        
        // 检查逻辑过期时间
        if (System.currentTimeMillis() < expireTime) {
            // 未过期,直接返回
            return product;
        }
        
        // 已过期,尝试获取锁进行异步更新
        String lockKey = "lock:" + key;
        Boolean locked = redisTemplate.opsForValue()
            .setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
        
        if (Boolean.TRUE.equals(locked)) {
            // 获取锁成功,异步更新缓存
            EXECUTOR.submit(() -> {
                try {
                    loadAndCache(id, key);
                } finally {
                    redisTemplate.delete(lockKey);
                }
            });
        }
        
        // 无论是否更新,先返回旧数据
        return product;
    }
    
    private Product loadAndCache(Long id, String key) {
        Product product = productMapper.selectById(id);
        if (product != null) {
            CacheData<Product> cacheData = new CacheData<>();
            cacheData.setData(product);
            // 设置逻辑过期时间:1小时后
            cacheData.setExpireTime(System.currentTimeMillis() + 3600 * 1000);
            
            // 缓存不设置 TTL,永不过期
            redisTemplate.opsForValue().set(key, JSON.toJSONString(cacheData));
        }
        return product;
    }
}

方案对比

方案优点缺点适用场景
互斥锁保证数据一致性有等待时间,性能略低数据一致性要求高
逻辑过期无等待,性能高可能返回过期数据允许短暂数据不一致

四、缓存雪崩

4.1 问题描述

缓存雪崩:大量 Key 同时过期,或 Redis 服务宕机,导致大量请求直接打到数据库。

4.2 解决方案

方案一:过期时间随机化

@Service
public class CacheAvalancheService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    private static final Random random = new Random();
    
    public void cacheWithRandomExpire(String key, String value, long baseSeconds) {
        // 基础过期时间 + 随机时间(0-300秒)
        long randomSeconds = baseSeconds + random.nextInt(300);
        redisTemplate.opsForValue().set(key, value, randomSeconds, TimeUnit.SECONDS);
    }
    
    // 批量缓存时使用不同过期时间
    public void batchCache(Map<String, String> data, long baseSeconds) {
        data.forEach((key, value) -> {
            long randomSeconds = baseSeconds + random.nextInt(300);
            redisTemplate.opsForValue().set(key, value, randomSeconds, TimeUnit.SECONDS);
        });
    }
}

方案二:多级缓存

@Service
public class MultiLevelCacheService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Autowired
    private ProductMapper productMapper;
    
    // 本地缓存(Caffeine)
    private final Cache<String, Product> localCache = Caffeine.newBuilder()
        .maximumSize(10000)
        .expireAfterWrite(5, TimeUnit.MINUTES)
        .build();
    
    public Product getProductById(Long id) {
        String key = "product:" + id;
        
        // 1. 查询本地缓存
        Product product = localCache.getIfPresent(key);
        if (product != null) {
            return product;
        }
        
        // 2. 查询 Redis
        String cacheValue = redisTemplate.opsForValue().get(key);
        if (cacheValue != null) {
            product = JSON.parseObject(cacheValue, Product.class);
            // 回填本地缓存
            localCache.put(key, product);
            return product;
        }
        
        // 3. 查询数据库
        product = productMapper.selectById(id);
        if (product != null) {
            // 回填 Redis(随机过期时间)
            long expire = 3600 + new Random().nextInt(300);
            redisTemplate.opsForValue().set(key, JSON.toJSONString(product), 
                expire, TimeUnit.SECONDS);
            // 回填本地缓存
            localCache.put(key, product);
        }
        
        return product;
    }
}

方案三:熔断降级

@Service
public class CircuitBreakerService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    // 使用 Resilience4j 熔断器
    private final CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("redis");
    
    public String getValue(String key) {
        // 通过熔断器调用 Redis
        return circuitBreaker.executeSupplier(() -> 
            redisTemplate.opsForValue().get(key)
        );
    }
    
    // 或使用 Sentinel 限流
    @SentinelResource(value = "getProduct", 
        fallback = "getProductFallback",
        blockHandler = "getProductBlockHandler")
    public Product getProduct(Long id) {
        // 正常逻辑
        return productService.getById(id);
    }
    
    // 降级方法
    public Product getProductFallback(Long id, Throwable e) {
        // 返回默认值或从备份数据源获取
        return new Product(id, "默认商品", BigDecimal.ZERO);
    }
    
    // 限流处理
    public Product getProductBlockHandler(Long id, BlockException e) {
        throw new RuntimeException("系统繁忙,请稍后重试");
    }
}

方案四:Redis 高可用

# 使用 Redis Cluster 或 哨兵模式保证高可用

# 哨兵配置
sentinel monitor mymaster 192.168.1.100 6379 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000

五、大 Key 问题

5.1 什么是大 Key

类型判定标准
StringValue > 10KB
Hash/Set/ZSet/List元素数量 > 10000 或总大小 > 10MB

危害:

  • 内存分布不均匀
  • 阻塞其他请求
  • 网络带宽占用
  • 主从同步延迟
  • 删除时阻塞

5.2 发现大 Key

# 方法1:redis-cli --bigkeys
redis-cli --bigkeys -a password

# 方法2:使用 SCAN 遍历
redis-cli --scan --pattern '*' | while read key; do
    type=$(redis-cli type "$key")
    case $type in
        string)
            size=$(redis-cli strlen "$key")
            ;;
        list)
            size=$(redis-cli llen "$key")
            ;;
        hash)
            size=$(redis-cli hlen "$key")
            ;;
        set)
            size=$(redis-cli scard "$key")
            ;;
        zset)
            size=$(redis-cli zcard "$key")
            ;;
    esac
    echo "$key: $type, $size"
done

# 方法3:MEMORY USAGE 命令(Redis 4.0+)
MEMORY USAGE key

5.3 解决方案

方案一:拆分大 Key

@Service
public class BigKeyService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    // 大 Hash 拆分示例
    // 原始:user:followers -> {uid1: info1, uid2: info2, ...} (100万个)
    // 拆分:user:followers:0 -> 10000个, user:followers:1 -> 10000个, ...
    
    private static final int SEGMENT_SIZE = 10000;
    
    // 添加关注者
    public void addFollower(Long userId, Long followerId, String info) {
        int segment = (int) (followerId % SEGMENT_SIZE / SEGMENT_SIZE);
        String key = "user:followers:" + userId + ":" + segment;
        redisTemplate.opsForHash().put(key, followerId.toString(), info);
    }
    
    // 获取关注者
    public String getFollower(Long userId, Long followerId) {
        int segment = (int) (followerId % SEGMENT_SIZE / SEGMENT_SIZE);
        String key = "user:followers:" + userId + ":" + segment;
        return (String) redisTemplate.opsForHash().get(key, followerId.toString());
    }
    
    // 获取所有关注者(需遍历所有分片)
    public Map<String, String> getAllFollowers(Long userId) {
        Map<String, String> result = new HashMap<>();
        int segment = 0;
        while (true) {
            String key = "user:followers:" + userId + ":" + segment;
            if (!Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
                break;
            }
            Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
            entries.forEach((k, v) -> result.put((String) k, (String) v));
            segment++;
        }
        return result;
    }
}

方案二:渐进式删除

@Service
public class BigKeyDeleteService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    // Redis 4.0+ 使用 UNLINK 异步删除
    public void asyncDelete(String key) {
        redisTemplate.unlink(key);
    }
    
    // 渐进式删除 Hash
    public void deleteHashGradually(String key) {
        long batchSize = 100;
        ScanOptions options = ScanOptions.scanOptions().count(batchSize).build();
        
        try (Cursor<Map.Entry<Object, Object>> cursor = 
                redisTemplate.opsForHash().scan(key, options)) {
            List<Object> fields = new ArrayList<>();
            
            while (cursor.hasNext()) {
                fields.add(cursor.next().getKey());
                
                if (fields.size() >= batchSize) {
                    redisTemplate.opsForHash().delete(key, fields.toArray());
                    fields.clear();
                    
                    // 避免阻塞
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            
            // 删除剩余字段
            if (!fields.isEmpty()) {
                redisTemplate.opsForHash().delete(key, fields.toArray());
            }
        }
        
        // 删除 Key 本身
        redisTemplate.delete(key);
    }
    
    // 渐进式删除 List
    public void deleteListGradually(String key) {
        while (true) {
            Long removed = redisTemplate.opsForList().trim(key, 100, -1);
            if (redisTemplate.opsForList().size(key) == 0) {
                break;
            }
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        redisTemplate.delete(key);
    }
}

方案三:压缩数据

@Service
public class CompressService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    // 使用 GZIP 压缩
    public void setCompressed(String key, String value, long expire, TimeUnit unit) {
        try {
            byte[] compressed = compress(value.getBytes(StandardCharsets.UTF_8));
            String base64 = Base64.getEncoder().encodeToString(compressed);
            redisTemplate.opsForValue().set(key, base64, expire, unit);
        } catch (IOException e) {
            throw new RuntimeException("压缩失败", e);
        }
    }
    
    public String getCompressed(String key) {
        String base64 = redisTemplate.opsForValue().get(key);
        if (base64 == null) {
            return null;
        }
        try {
            byte[] compressed = Base64.getDecoder().decode(base64);
            return new String(decompress(compressed), StandardCharsets.UTF_8);
        } catch (IOException e) {
            throw new RuntimeException("解压失败", e);
        }
    }
    
    private byte[] compress(byte[] data) throws IOException {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        try (GZIPOutputStream gzip = new GZIPOutputStream(bos)) {
            gzip.write(data);
        }
        return bos.toByteArray();
    }
    
    private byte[] decompress(byte[] compressed) throws IOException {
        ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        try (GZIPInputStream gzip = new GZIPInputStream(bis)) {
            byte[] buffer = new byte[1024];
            int len;
            while ((len = gzip.read(buffer)) != -1) {
                bos.write(buffer, 0, len);
            }
        }
        return bos.toByteArray();
    }
}

六、热 Key 问题

6.1 什么是热 Key

热 Key:某个 Key 访问频率极高,导致单节点压力过大。

场景:

  • 热门商品详情
  • 热搜话题
  • 秒杀商品
  • 明星微博

6.2 发现热 Key

# 方法1:redis-cli --hotkeys(需开启 LFU)
# 需要配置:maxmemory-policy volatile-lfu
redis-cli --hotkeys -a password

# 方法2:MONITOR 命令(生产慎用)
redis-cli monitor | head -n 1000

# 方法3:客户端统计
# 在代码中记录 Key 访问频率

6.3 解决方案

方案一:本地缓存

@Service
public class HotKeyLocalCacheService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    // 本地缓存热点数据
    private final Cache<String, String> hotCache = Caffeine.newBuilder()
        .maximumSize(1000)
        .expireAfterWrite(10, TimeUnit.SECONDS)  // 短过期时间
        .build();
    
    // 热点Key列表(可从配置中心获取或动态计算)
    private Set<String> hotKeys = ConcurrentHashMap.newKeySet();
    
    public String getValue(String key) {
        // 判断是否是热点Key
        if (hotKeys.contains(key)) {
            // 优先从本地缓存获取
            return hotCache.get(key, k -> redisTemplate.opsForValue().get(k));
        }
        
        // 非热点Key直接查Redis
        return redisTemplate.opsForValue().get(key);
    }
    
    // 动态标记热点Key
    public void markAsHot(String key) {
        hotKeys.add(key);
    }
    
    public void unmarkAsHot(String key) {
        hotKeys.remove(key);
        hotCache.invalidate(key);
    }
}

方案二:Key 分片(复制多份)

@Service
public class HotKeyShardService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    private static final int SHARD_COUNT = 10;
    private static final Random random = new Random();
    
    // 写入时复制到多个分片
    public void setHotKey(String key, String value, long expire, TimeUnit unit) {
        for (int i = 0; i < SHARD_COUNT; i++) {
            String shardKey = key + ":" + i;
            redisTemplate.opsForValue().set(shardKey, value, expire, unit);
        }
    }
    
    // 读取时随机选择分片
    public String getHotKey(String key) {
        int shard = random.nextInt(SHARD_COUNT);
        String shardKey = key + ":" + shard;
        return redisTemplate.opsForValue().get(shardKey);
    }
    
    // 更新时需要更新所有分片
    public void updateHotKey(String key, String value) {
        for (int i = 0; i < SHARD_COUNT; i++) {
            String shardKey = key + ":" + i;
            Long ttl = redisTemplate.getExpire(shardKey, TimeUnit.SECONDS);
            if (ttl != null && ttl > 0) {
                redisTemplate.opsForValue().set(shardKey, value, ttl, TimeUnit.SECONDS);
            }
        }
    }
}

方案三:读写分离 + 增加从节点

@Configuration
public class HotKeyReadConfig {
    
    @Bean
    public LettuceConnectionFactory readReplicaConnectionFactory() {
        RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
        config.setHostName("master_host");
        config.setPort(6379);
        
        LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
            .readFrom(ReadFrom.REPLICA)  // 强制从从节点读取
            .build();
        
        return new LettuceConnectionFactory(config, clientConfig);
    }
}

七、问题解决方案总结

问题核心原因推荐方案
穿透查询不存在的数据布隆过滤器 + 空值缓存
击穿热点 Key 过期逻辑过期 或 互斥锁
雪崩大量 Key 同时过期随机过期时间 + 多级缓存
大 Key数据量过大拆分 + 异步删除
热 Key访问过于集中本地缓存 + Key 分片