Redis 核心知识笔记 - 第四部分:高并发问题与解决方案
Redis 核心知识笔记 - 第四部分:高并发问题与解决方案
一、缓存问题概览
二、缓存穿透
2.1 问题描述
缓存穿透:查询一个一定不存在的数据,由于缓存中没有,每次都会去数据库查询,导致数据库压力过大。
2.2 解决方案
方案一:缓存空值
@Service
public class CachePenetrationService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private UserMapper userMapper;
private static final String EMPTY_VALUE = "NULL";
private static final long EMPTY_EXPIRE = 300; // 空值缓存5分钟
public User getUserById(Long id) {
String key = "user:" + id;
String cacheValue = redisTemplate.opsForValue().get(key);
// 缓存命中
if (cacheValue != null) {
// 判断是否是空值占位符
if (EMPTY_VALUE.equals(cacheValue)) {
return null;
}
return JSON.parseObject(cacheValue, User.class);
}
// 查询数据库
User user = userMapper.selectById(id);
if (user != null) {
// 正常缓存
redisTemplate.opsForValue().set(key, JSON.toJSONString(user),
1, TimeUnit.HOURS);
} else {
// 缓存空值,设置较短过期时间
redisTemplate.opsForValue().set(key, EMPTY_VALUE,
EMPTY_EXPIRE, TimeUnit.SECONDS);
}
return user;
}
}
方案二:布隆过滤器
@Service
public class BloomFilterService {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String BLOOM_KEY = "user:bloom";
// 初始化布隆过滤器(使用 Redisson)
@Autowired
private RedissonClient redissonClient;
private RBloomFilter<Long> bloomFilter;
@PostConstruct
public void init() {
bloomFilter = redissonClient.getBloomFilter(BLOOM_KEY);
// 预计元素数量100万,误判率0.01%
bloomFilter.tryInit(1000000L, 0.0001);
// 初始化时加载所有存在的ID
loadAllUserIds();
}
private void loadAllUserIds() {
// 从数据库加载所有用户ID
List<Long> userIds = userMapper.selectAllIds();
userIds.forEach(id -> bloomFilter.add(id));
}
public User getUserById(Long id) {
// 1. 布隆过滤器判断
if (!bloomFilter.contains(id)) {
// 一定不存在
return null;
}
// 2. 可能存在,查缓存
String key = "user:" + id;
String cacheValue = redisTemplate.opsForValue().get(key);
if (cacheValue != null) {
return JSON.parseObject(cacheValue, User.class);
}
// 3. 查数据库
User user = userMapper.selectById(id);
if (user != null) {
redisTemplate.opsForValue().set(key, JSON.toJSONString(user),
1, TimeUnit.HOURS);
}
return user;
}
// 新增用户时添加到布隆过滤器
public void addUser(User user) {
userMapper.insert(user);
bloomFilter.add(user.getId());
}
}
方案三:接口层校验
@RestController
public class UserController {
@GetMapping("/user/{id}")
public Result<User> getUser(@PathVariable Long id) {
// 参数校验:过滤明显非法的请求
if (id == null || id <= 0) {
return Result.fail("参数非法");
}
// ID 范围校验
if (id > MAX_USER_ID) {
return Result.fail("用户不存在");
}
return Result.success(userService.getUserById(id));
}
}
三、缓存击穿
3.1 问题描述
缓存击穿:某个热点 Key 在过期的瞬间,大量并发请求同时访问,全部打到数据库。
3.2 解决方案
方案一:互斥锁(分布式锁)
@Service
public class CacheBreakdownMutexService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private ProductMapper productMapper;
private static final String LOCK_PREFIX = "lock:";
public Product getProductById(Long id) {
String key = "product:" + id;
String cacheValue = redisTemplate.opsForValue().get(key);
// 缓存命中
if (cacheValue != null) {
return JSON.parseObject(cacheValue, Product.class);
}
// 缓存未命中,获取分布式锁
String lockKey = LOCK_PREFIX + key;
String lockValue = UUID.randomUUID().toString();
try {
// 尝试获取锁
Boolean locked = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(locked)) {
// 获取锁成功,查询数据库
Product product = productMapper.selectById(id);
if (product != null) {
// 设置缓存
redisTemplate.opsForValue().set(key,
JSON.toJSONString(product), 1, TimeUnit.HOURS);
}
return product;
} else {
// 获取锁失败,等待后重试
Thread.sleep(50);
return getProductById(id); // 递归重试
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} finally {
// 释放锁(使用 Lua 脚本保证原子性)
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(lockKey), lockValue);
}
}
}
方案二:逻辑过期
@Data
public class CacheData<T> {
private T data;
private Long expireTime; // 逻辑过期时间戳
}
@Service
public class LogicalExpireService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private ProductMapper productMapper;
private static final ExecutorService EXECUTOR =
Executors.newFixedThreadPool(10);
public Product getProductById(Long id) {
String key = "product:" + id;
String cacheValue = redisTemplate.opsForValue().get(key);
// 缓存不存在(冷启动情况)
if (cacheValue == null) {
return loadAndCache(id, key);
}
// 解析缓存数据
CacheData<Product> cacheData = JSON.parseObject(cacheValue,
new TypeReference<CacheData<Product>>() {});
Product product = cacheData.getData();
Long expireTime = cacheData.getExpireTime();
// 检查逻辑过期时间
if (System.currentTimeMillis() < expireTime) {
// 未过期,直接返回
return product;
}
// 已过期,尝试获取锁进行异步更新
String lockKey = "lock:" + key;
Boolean locked = redisTemplate.opsForValue()
.setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(locked)) {
// 获取锁成功,异步更新缓存
EXECUTOR.submit(() -> {
try {
loadAndCache(id, key);
} finally {
redisTemplate.delete(lockKey);
}
});
}
// 无论是否更新,先返回旧数据
return product;
}
private Product loadAndCache(Long id, String key) {
Product product = productMapper.selectById(id);
if (product != null) {
CacheData<Product> cacheData = new CacheData<>();
cacheData.setData(product);
// 设置逻辑过期时间:1小时后
cacheData.setExpireTime(System.currentTimeMillis() + 3600 * 1000);
// 缓存不设置 TTL,永不过期
redisTemplate.opsForValue().set(key, JSON.toJSONString(cacheData));
}
return product;
}
}
方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 互斥锁 | 保证数据一致性 | 有等待时间,性能略低 | 数据一致性要求高 |
| 逻辑过期 | 无等待,性能高 | 可能返回过期数据 | 允许短暂数据不一致 |
四、缓存雪崩
4.1 问题描述
缓存雪崩:大量 Key 同时过期,或 Redis 服务宕机,导致大量请求直接打到数据库。
4.2 解决方案
方案一:过期时间随机化
@Service
public class CacheAvalancheService {
@Autowired
private StringRedisTemplate redisTemplate;
private static final Random random = new Random();
public void cacheWithRandomExpire(String key, String value, long baseSeconds) {
// 基础过期时间 + 随机时间(0-300秒)
long randomSeconds = baseSeconds + random.nextInt(300);
redisTemplate.opsForValue().set(key, value, randomSeconds, TimeUnit.SECONDS);
}
// 批量缓存时使用不同过期时间
public void batchCache(Map<String, String> data, long baseSeconds) {
data.forEach((key, value) -> {
long randomSeconds = baseSeconds + random.nextInt(300);
redisTemplate.opsForValue().set(key, value, randomSeconds, TimeUnit.SECONDS);
});
}
}
方案二:多级缓存
@Service
public class MultiLevelCacheService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private ProductMapper productMapper;
// 本地缓存(Caffeine)
private final Cache<String, Product> localCache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build();
public Product getProductById(Long id) {
String key = "product:" + id;
// 1. 查询本地缓存
Product product = localCache.getIfPresent(key);
if (product != null) {
return product;
}
// 2. 查询 Redis
String cacheValue = redisTemplate.opsForValue().get(key);
if (cacheValue != null) {
product = JSON.parseObject(cacheValue, Product.class);
// 回填本地缓存
localCache.put(key, product);
return product;
}
// 3. 查询数据库
product = productMapper.selectById(id);
if (product != null) {
// 回填 Redis(随机过期时间)
long expire = 3600 + new Random().nextInt(300);
redisTemplate.opsForValue().set(key, JSON.toJSONString(product),
expire, TimeUnit.SECONDS);
// 回填本地缓存
localCache.put(key, product);
}
return product;
}
}
方案三:熔断降级
@Service
public class CircuitBreakerService {
@Autowired
private StringRedisTemplate redisTemplate;
// 使用 Resilience4j 熔断器
private final CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("redis");
public String getValue(String key) {
// 通过熔断器调用 Redis
return circuitBreaker.executeSupplier(() ->
redisTemplate.opsForValue().get(key)
);
}
// 或使用 Sentinel 限流
@SentinelResource(value = "getProduct",
fallback = "getProductFallback",
blockHandler = "getProductBlockHandler")
public Product getProduct(Long id) {
// 正常逻辑
return productService.getById(id);
}
// 降级方法
public Product getProductFallback(Long id, Throwable e) {
// 返回默认值或从备份数据源获取
return new Product(id, "默认商品", BigDecimal.ZERO);
}
// 限流处理
public Product getProductBlockHandler(Long id, BlockException e) {
throw new RuntimeException("系统繁忙,请稍后重试");
}
}
方案四:Redis 高可用
# 使用 Redis Cluster 或 哨兵模式保证高可用
# 哨兵配置
sentinel monitor mymaster 192.168.1.100 6379 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
五、大 Key 问题
5.1 什么是大 Key
| 类型 | 判定标准 |
|---|---|
| String | Value > 10KB |
| Hash/Set/ZSet/List | 元素数量 > 10000 或总大小 > 10MB |
危害:
- 内存分布不均匀
- 阻塞其他请求
- 网络带宽占用
- 主从同步延迟
- 删除时阻塞
5.2 发现大 Key
# 方法1:redis-cli --bigkeys
redis-cli --bigkeys -a password
# 方法2:使用 SCAN 遍历
redis-cli --scan --pattern '*' | while read key; do
type=$(redis-cli type "$key")
case $type in
string)
size=$(redis-cli strlen "$key")
;;
list)
size=$(redis-cli llen "$key")
;;
hash)
size=$(redis-cli hlen "$key")
;;
set)
size=$(redis-cli scard "$key")
;;
zset)
size=$(redis-cli zcard "$key")
;;
esac
echo "$key: $type, $size"
done
# 方法3:MEMORY USAGE 命令(Redis 4.0+)
MEMORY USAGE key
5.3 解决方案
方案一:拆分大 Key
@Service
public class BigKeyService {
@Autowired
private StringRedisTemplate redisTemplate;
// 大 Hash 拆分示例
// 原始:user:followers -> {uid1: info1, uid2: info2, ...} (100万个)
// 拆分:user:followers:0 -> 10000个, user:followers:1 -> 10000个, ...
private static final int SEGMENT_SIZE = 10000;
// 添加关注者
public void addFollower(Long userId, Long followerId, String info) {
int segment = (int) (followerId % SEGMENT_SIZE / SEGMENT_SIZE);
String key = "user:followers:" + userId + ":" + segment;
redisTemplate.opsForHash().put(key, followerId.toString(), info);
}
// 获取关注者
public String getFollower(Long userId, Long followerId) {
int segment = (int) (followerId % SEGMENT_SIZE / SEGMENT_SIZE);
String key = "user:followers:" + userId + ":" + segment;
return (String) redisTemplate.opsForHash().get(key, followerId.toString());
}
// 获取所有关注者(需遍历所有分片)
public Map<String, String> getAllFollowers(Long userId) {
Map<String, String> result = new HashMap<>();
int segment = 0;
while (true) {
String key = "user:followers:" + userId + ":" + segment;
if (!Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
break;
}
Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
entries.forEach((k, v) -> result.put((String) k, (String) v));
segment++;
}
return result;
}
}
方案二:渐进式删除
@Service
public class BigKeyDeleteService {
@Autowired
private StringRedisTemplate redisTemplate;
// Redis 4.0+ 使用 UNLINK 异步删除
public void asyncDelete(String key) {
redisTemplate.unlink(key);
}
// 渐进式删除 Hash
public void deleteHashGradually(String key) {
long batchSize = 100;
ScanOptions options = ScanOptions.scanOptions().count(batchSize).build();
try (Cursor<Map.Entry<Object, Object>> cursor =
redisTemplate.opsForHash().scan(key, options)) {
List<Object> fields = new ArrayList<>();
while (cursor.hasNext()) {
fields.add(cursor.next().getKey());
if (fields.size() >= batchSize) {
redisTemplate.opsForHash().delete(key, fields.toArray());
fields.clear();
// 避免阻塞
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 删除剩余字段
if (!fields.isEmpty()) {
redisTemplate.opsForHash().delete(key, fields.toArray());
}
}
// 删除 Key 本身
redisTemplate.delete(key);
}
// 渐进式删除 List
public void deleteListGradually(String key) {
while (true) {
Long removed = redisTemplate.opsForList().trim(key, 100, -1);
if (redisTemplate.opsForList().size(key) == 0) {
break;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
redisTemplate.delete(key);
}
}
方案三:压缩数据
@Service
public class CompressService {
@Autowired
private StringRedisTemplate redisTemplate;
// 使用 GZIP 压缩
public void setCompressed(String key, String value, long expire, TimeUnit unit) {
try {
byte[] compressed = compress(value.getBytes(StandardCharsets.UTF_8));
String base64 = Base64.getEncoder().encodeToString(compressed);
redisTemplate.opsForValue().set(key, base64, expire, unit);
} catch (IOException e) {
throw new RuntimeException("压缩失败", e);
}
}
public String getCompressed(String key) {
String base64 = redisTemplate.opsForValue().get(key);
if (base64 == null) {
return null;
}
try {
byte[] compressed = Base64.getDecoder().decode(base64);
return new String(decompress(compressed), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException("解压失败", e);
}
}
private byte[] compress(byte[] data) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try (GZIPOutputStream gzip = new GZIPOutputStream(bos)) {
gzip.write(data);
}
return bos.toByteArray();
}
private byte[] decompress(byte[] compressed) throws IOException {
ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try (GZIPInputStream gzip = new GZIPInputStream(bis)) {
byte[] buffer = new byte[1024];
int len;
while ((len = gzip.read(buffer)) != -1) {
bos.write(buffer, 0, len);
}
}
return bos.toByteArray();
}
}
六、热 Key 问题
6.1 什么是热 Key
热 Key:某个 Key 访问频率极高,导致单节点压力过大。
场景:
- 热门商品详情
- 热搜话题
- 秒杀商品
- 明星微博
6.2 发现热 Key
# 方法1:redis-cli --hotkeys(需开启 LFU)
# 需要配置:maxmemory-policy volatile-lfu
redis-cli --hotkeys -a password
# 方法2:MONITOR 命令(生产慎用)
redis-cli monitor | head -n 1000
# 方法3:客户端统计
# 在代码中记录 Key 访问频率
6.3 解决方案
方案一:本地缓存
@Service
public class HotKeyLocalCacheService {
@Autowired
private StringRedisTemplate redisTemplate;
// 本地缓存热点数据
private final Cache<String, String> hotCache = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(10, TimeUnit.SECONDS) // 短过期时间
.build();
// 热点Key列表(可从配置中心获取或动态计算)
private Set<String> hotKeys = ConcurrentHashMap.newKeySet();
public String getValue(String key) {
// 判断是否是热点Key
if (hotKeys.contains(key)) {
// 优先从本地缓存获取
return hotCache.get(key, k -> redisTemplate.opsForValue().get(k));
}
// 非热点Key直接查Redis
return redisTemplate.opsForValue().get(key);
}
// 动态标记热点Key
public void markAsHot(String key) {
hotKeys.add(key);
}
public void unmarkAsHot(String key) {
hotKeys.remove(key);
hotCache.invalidate(key);
}
}
方案二:Key 分片(复制多份)
@Service
public class HotKeyShardService {
@Autowired
private StringRedisTemplate redisTemplate;
private static final int SHARD_COUNT = 10;
private static final Random random = new Random();
// 写入时复制到多个分片
public void setHotKey(String key, String value, long expire, TimeUnit unit) {
for (int i = 0; i < SHARD_COUNT; i++) {
String shardKey = key + ":" + i;
redisTemplate.opsForValue().set(shardKey, value, expire, unit);
}
}
// 读取时随机选择分片
public String getHotKey(String key) {
int shard = random.nextInt(SHARD_COUNT);
String shardKey = key + ":" + shard;
return redisTemplate.opsForValue().get(shardKey);
}
// 更新时需要更新所有分片
public void updateHotKey(String key, String value) {
for (int i = 0; i < SHARD_COUNT; i++) {
String shardKey = key + ":" + i;
Long ttl = redisTemplate.getExpire(shardKey, TimeUnit.SECONDS);
if (ttl != null && ttl > 0) {
redisTemplate.opsForValue().set(shardKey, value, ttl, TimeUnit.SECONDS);
}
}
}
}
方案三:读写分离 + 增加从节点
@Configuration
public class HotKeyReadConfig {
@Bean
public LettuceConnectionFactory readReplicaConnectionFactory() {
RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
config.setHostName("master_host");
config.setPort(6379);
LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
.readFrom(ReadFrom.REPLICA) // 强制从从节点读取
.build();
return new LettuceConnectionFactory(config, clientConfig);
}
}
七、问题解决方案总结
| 问题 | 核心原因 | 推荐方案 |
|---|---|---|
| 穿透 | 查询不存在的数据 | 布隆过滤器 + 空值缓存 |
| 击穿 | 热点 Key 过期 | 逻辑过期 或 互斥锁 |
| 雪崩 | 大量 Key 同时过期 | 随机过期时间 + 多级缓存 |
| 大 Key | 数据量过大 | 拆分 + 异步删除 |
| 热 Key | 访问过于集中 | 本地缓存 + Key 分片 |