Redis 核心知识笔记 - 第五部分:扩展功能
Redis 核心知识笔记 - 第五部分:扩展功能
一、Redis 事务
1.1 事务基础
Redis 事务通过 MULTI、EXEC、DISCARD、WATCH 命令实现。
1.2 基本命令
# 开启事务
MULTI
# 命令入队
SET user:1001:name "张三"
SET user:1001:age 25
INCR user:count
# 执行事务
EXEC
# 取消事务
DISCARD
# 监听 Key(乐观锁)
WATCH key [key ...]
# 取消监听
UNWATCH
1.3 事务特性
| 特性 | 描述 |
|---|---|
| 原子性 | 事务中的命令要么全部执行,要么全部不执行(语法错误时) |
| 隔离性 | 事务执行期间不会被其他客户端打断 |
| 无回滚 | 运行时错误不会回滚已执行的命令 |
Warning
Redis 事务与传统 ACID 事务的区别:
- Redis 事务不支持回滚(运行时错误不会回滚)
- Redis 事务不保证持久性(取决于持久化配置)
1.4 事务错误处理
示例:语法错误
MULTI
SET key1 value1
SETX key2 value2 # 错误命令
SET key3 value3
EXEC # 整个事务被取消
示例:运行时错误
MULTI
SET key1 value1
INCR key1 # 运行时错误:key1 不是数字
SET key3 value3
EXEC # key1 和 key3 设置成功,INCR 返回错误
1.5 WATCH 乐观锁
1.6 Java 事务示例
@Service
public class RedisTransactionService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 基本事务操作
*/
public void basicTransaction() {
// 开启事务
redisTemplate.setEnableTransactionSupport(true);
redisTemplate.execute(new SessionCallback<List<Object>>() {
@Override
public List<Object> execute(RedisOperations operations) {
operations.multi();
operations.opsForValue().set("user:1001:name", "张三");
operations.opsForValue().set("user:1001:age", "25");
operations.opsForValue().increment("user:count");
return operations.exec();
}
});
}
/**
* 带 WATCH 的乐观锁事务
* 示例:账户转账
*/
public boolean transfer(String fromAccount, String toAccount, int amount) {
String fromKey = "account:" + fromAccount;
String toKey = "account:" + toAccount;
return Boolean.TRUE.equals(redisTemplate.execute(new SessionCallback<Boolean>() {
@Override
public Boolean execute(RedisOperations operations) {
// 监听转出账户
operations.watch(fromKey);
// 获取余额
String balanceStr = (String) operations.opsForValue().get(fromKey);
int balance = Integer.parseInt(balanceStr);
if (balance < amount) {
operations.unwatch();
return false; // 余额不足
}
// 开启事务
operations.multi();
operations.opsForValue().decrement(fromKey, amount);
operations.opsForValue().increment(toKey, amount);
// 执行事务
List<Object> results = operations.exec();
return results != null && !results.isEmpty();
}
}));
}
/**
* 使用 Lua 脚本实现原子操作(推荐)
*/
public boolean transferWithLua(String fromAccount, String toAccount, int amount) {
String script =
"local from = KEYS[1] " +
"local to = KEYS[2] " +
"local amount = tonumber(ARGV[1]) " +
"local balance = tonumber(redis.call('get', from) or 0) " +
"if balance >= amount then " +
" redis.call('decrby', from, amount) " +
" redis.call('incrby', to, amount) " +
" return 1 " +
"else " +
" return 0 " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Arrays.asList("account:" + fromAccount, "account:" + toAccount),
String.valueOf(amount)
);
return result != null && result == 1;
}
}
二、分布式锁
2.1 分布式锁需求
2.2 基础实现(SETNX)
@Service
public class SimpleDistributedLock {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String LOCK_PREFIX = "lock:";
/**
* 获取锁
* @param lockKey 锁名称
* @param requestId 请求标识(用于释放锁时验证)
* @param expireTime 过期时间(秒)
*/
public boolean tryLock(String lockKey, String requestId, long expireTime) {
String key = LOCK_PREFIX + lockKey;
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(key, requestId, expireTime, TimeUnit.SECONDS);
return Boolean.TRUE.equals(result);
}
/**
* 释放锁(使用 Lua 脚本保证原子性)
*/
public boolean unlock(String lockKey, String requestId) {
String key = LOCK_PREFIX + lockKey;
// Lua 脚本:只有持有者才能释放锁
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(key),
requestId
);
return result != null && result == 1;
}
/**
* 使用示例
*/
public void doBusinessWithLock(String orderId) {
String lockKey = "order:" + orderId;
String requestId = UUID.randomUUID().toString();
try {
// 尝试获取锁,最多等待 3 秒
int retryCount = 0;
while (!tryLock(lockKey, requestId, 30)) {
if (++retryCount > 30) {
throw new RuntimeException("获取锁超时");
}
Thread.sleep(100);
}
// 执行业务逻辑
processOrder(orderId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 释放锁
unlock(lockKey, requestId);
}
}
}
2.3 可重入锁实现
@Service
public class ReentrantDistributedLock {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String LOCK_PREFIX = "lock:";
// 使用 ThreadLocal 存储重入次数
private ThreadLocal<Map<String, Integer>> lockCountMap =
ThreadLocal.withInitial(HashMap::new);
/**
* 获取可重入锁
*/
public boolean tryLock(String lockKey, String requestId, long expireTime) {
String key = LOCK_PREFIX + lockKey;
Map<String, Integer> countMap = lockCountMap.get();
// 检查是否已持有锁(重入)
if (countMap.containsKey(key)) {
countMap.put(key, countMap.get(key) + 1);
return true;
}
// 尝试获取锁
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(key, requestId, expireTime, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(result)) {
countMap.put(key, 1);
return true;
}
return false;
}
/**
* 释放可重入锁
*/
public boolean unlock(String lockKey, String requestId) {
String key = LOCK_PREFIX + lockKey;
Map<String, Integer> countMap = lockCountMap.get();
if (!countMap.containsKey(key)) {
return false;
}
int count = countMap.get(key) - 1;
if (count > 0) {
// 还有重入层级,不释放锁
countMap.put(key, count);
return true;
}
// 删除计数器
countMap.remove(key);
// 释放 Redis 锁
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(key),
requestId
);
return result != null && result == 1;
}
}
2.4 自动续期(看门狗)
@Service
public class WatchDogLock {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String LOCK_PREFIX = "lock:";
private static final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(5);
// 存储续期任务
private ConcurrentHashMap<String, ScheduledFuture<?>> renewalTasks =
new ConcurrentHashMap<>();
/**
* 获取锁并启动看门狗
*/
public boolean tryLockWithWatchdog(String lockKey, String requestId,
long expireTime, TimeUnit unit) {
String key = LOCK_PREFIX + lockKey;
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(key, requestId, expireTime, unit);
if (Boolean.TRUE.equals(result)) {
// 启动看门狗续期
startWatchdog(key, requestId, expireTime, unit);
return true;
}
return false;
}
/**
* 启动看门狗
*/
private void startWatchdog(String key, String requestId,
long expireTime, TimeUnit unit) {
long expireMs = unit.toMillis(expireTime);
long renewalInterval = expireMs / 3; // 每 1/3 过期时间续期一次
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('pexpire', KEYS[1], ARGV[2]) " +
"else " +
" return 0 " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(key),
requestId, String.valueOf(expireMs)
);
if (result == null || result == 0) {
// 锁已失效,停止续期
stopWatchdog(key);
}
}, renewalInterval, renewalInterval, TimeUnit.MILLISECONDS);
renewalTasks.put(key, future);
}
/**
* 停止看门狗
*/
private void stopWatchdog(String key) {
ScheduledFuture<?> future = renewalTasks.remove(key);
if (future != null) {
future.cancel(false);
}
}
/**
* 释放锁并停止看门狗
*/
public boolean unlock(String lockKey, String requestId) {
String key = LOCK_PREFIX + lockKey;
// 停止看门狗
stopWatchdog(key);
// 释放锁
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(key),
requestId
);
return result != null && result == 1;
}
}
2.5 使用 Redisson(推荐)
Redisson 是 Redis 的 Java 客户端,提供了完善的分布式锁实现。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.23.5</version>
</dependency>
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("password")
.setDatabase(0);
return Redisson.create(config);
}
}
@Service
public class RedissonLockService {
@Autowired
private RedissonClient redissonClient;
/**
* 基本使用
*/
public void doWithLock(String lockName) {
RLock lock = redissonClient.getLock("lock:" + lockName);
try {
// 尝试获取锁,等待10秒,锁持有30秒
boolean acquired = lock.tryLock(10, 30, TimeUnit.SECONDS);
if (acquired) {
// 执行业务逻辑
doSomething();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 只有持有锁的线程才能释放
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
/**
* 看门狗自动续期
* 不设置 leaseTime,Redisson 会自动续期
*/
public void doWithWatchdog(String lockName) {
RLock lock = redissonClient.getLock("lock:" + lockName);
try {
// 不设置 leaseTime,看门狗自动续期(默认30秒,每10秒续期)
lock.lock();
doSomething();
} finally {
lock.unlock();
}
}
/**
* 公平锁(按请求顺序获取锁)
*/
public void doWithFairLock(String lockName) {
RLock fairLock = redissonClient.getFairLock("lock:" + lockName);
try {
fairLock.lock();
doSomething();
} finally {
fairLock.unlock();
}
}
/**
* 读写锁
*/
public void readWriteLock(String lockName) {
RReadWriteLock rwLock = redissonClient.getReadWriteLock("rwLock:" + lockName);
// 读锁(共享)
RLock readLock = rwLock.readLock();
try {
readLock.lock();
// 读取操作
} finally {
readLock.unlock();
}
// 写锁(排他)
RLock writeLock = rwLock.writeLock();
try {
writeLock.lock();
// 写入操作
} finally {
writeLock.unlock();
}
}
/**
* 信号量
*/
public void semaphoreExample() {
RSemaphore semaphore = redissonClient.getSemaphore("semaphore:parking");
semaphore.trySetPermits(100); // 设置100个许可
try {
// 获取许可
semaphore.acquire();
// 执行操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 释放许可
semaphore.release();
}
}
/**
* 闭锁(CountDownLatch)
*/
public void countDownLatchExample() throws InterruptedException {
RCountDownLatch latch = redissonClient.getCountDownLatch("latch:task");
latch.trySetCount(5); // 设置计数为5
// 等待计数归零
latch.await();
// 其他线程调用 countDown
latch.countDown();
}
}
三、RedLock(红锁)
3.1 为什么需要 RedLock
单 Redis 实例的分布式锁存在单点故障问题:
3.2 RedLock 算法
核心思想:在多个独立的 Redis 实例上获取锁,只有在多数实例上获取成功才算成功。
3.3 RedLock 算法步骤
3.4 Redisson RedLock 实现
@Configuration
public class RedLockConfig {
@Bean
public RedissonClient redissonClient1() {
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.1.101:6379");
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient2() {
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.1.102:6379");
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient3() {
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.1.103:6379");
return Redisson.create(config);
}
}
@Service
public class RedLockService {
@Autowired
private RedissonClient redissonClient1;
@Autowired
private RedissonClient redissonClient2;
@Autowired
private RedissonClient redissonClient3;
/**
* 使用 RedLock
*/
public void doWithRedLock(String lockName) {
// 从不同 Redis 实例获取锁对象
RLock lock1 = redissonClient1.getLock("lock:" + lockName);
RLock lock2 = redissonClient2.getLock("lock:" + lockName);
RLock lock3 = redissonClient3.getLock("lock:" + lockName);
// 创建 RedLock
RLock redLock = redissonClient1.getMultiLock(lock1, lock2, lock3);
try {
// 尝试获取锁
boolean acquired = redLock.tryLock(10, 30, TimeUnit.SECONDS);
if (acquired) {
// 执行业务逻辑
doSomething();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 释放锁
redLock.unlock();
}
}
/**
* 红锁实现原理演示
*/
public boolean tryRedLock(String lockName, String requestId,
long ttlMs, int quorum) {
List<RedissonClient> clients = Arrays.asList(
redissonClient1, redissonClient2, redissonClient3
);
long startTime = System.currentTimeMillis();
int successCount = 0;
List<RLock> acquiredLocks = new ArrayList<>();
// 依次尝试获取锁
for (RedissonClient client : clients) {
RLock lock = client.getLock("lock:" + lockName);
try {
boolean acquired = lock.tryLock(
ttlMs / clients.size(), // 单个实例超时
ttlMs,
TimeUnit.MILLISECONDS
);
if (acquired) {
successCount++;
acquiredLocks.add(lock);
}
} catch (Exception e) {
// 单个实例失败不影响整体
}
}
// 计算总耗时
long elapsedTime = System.currentTimeMillis() - startTime;
long validTime = ttlMs - elapsedTime;
// 判断是否获取成功
if (successCount >= quorum && validTime > 0) {
return true;
} else {
// 释放所有已获取的锁
for (RLock lock : acquiredLocks) {
try {
lock.unlock();
} catch (Exception e) {
// 忽略
}
}
return false;
}
}
}
3.5 RedLock 争议
Caution
Martin Kleppmann 的质疑:
- 时钟跳跃问题:系统时钟不同步可能导致锁提前过期
- GC 暂停问题:长时间 GC 可能导致锁过期后仍在执行
- 网络延迟问题:获取锁后,网络延迟可能消耗大量有效时间
建议:
- 对于大多数场景,单 Redis 实例 + 主从 + 哨兵已足够
- 如果需要极高可靠性,考虑使用 Zookeeper 或 etcd
四、Lua 脚本
4.1 为什么使用 Lua 脚本
- 原子性:整个脚本作为一个原子操作执行
- 减少网络开销:多个命令合并为一次请求
- 复用性:脚本可以缓存在服务端
4.2 基本语法
-- 获取参数
-- KEYS[n]:键名参数
-- ARGV[n]:附加参数
-- 调用 Redis 命令
redis.call('SET', KEYS[1], ARGV[1])
local value = redis.call('GET', KEYS[1])
-- 返回值
return value
4.3 常用 Lua 脚本示例
@Service
public class LuaScriptService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 原子性检查并设置(CAS)
*/
public boolean compareAndSet(String key, String expect, String update) {
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" redis.call('set', KEYS[1], ARGV[2]) " +
" return 1 " +
"else " +
" return 0 " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(key),
expect, update
);
return result != null && result == 1;
}
/**
* 限流:滑动窗口
*/
public boolean isAllowed(String key, int maxRequests, int windowSeconds) {
String script =
"local key = KEYS[1] " +
"local max = tonumber(ARGV[1]) " +
"local window = tonumber(ARGV[2]) " +
"local now = tonumber(ARGV[3]) " +
"local window_start = now - window * 1000 " +
// 移除窗口外的请求
"redis.call('ZREMRANGEBYSCORE', key, 0, window_start) " +
// 获取当前窗口内的请求数
"local count = redis.call('ZCARD', key) " +
"if count < max then " +
// 添加当前请求
" redis.call('ZADD', key, now, now) " +
" redis.call('EXPIRE', key, window) " +
" return 1 " +
"else " +
" return 0 " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(key),
String.valueOf(maxRequests),
String.valueOf(windowSeconds),
String.valueOf(System.currentTimeMillis())
);
return result != null && result == 1;
}
/**
* 库存扣减
*/
public boolean decrStock(String productId, int quantity) {
String script =
"local stock = tonumber(redis.call('get', KEYS[1]) or 0) " +
"local quantity = tonumber(ARGV[1]) " +
"if stock >= quantity then " +
" redis.call('decrby', KEYS[1], quantity) " +
" return stock - quantity " +
"else " +
" return -1 " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList("stock:" + productId),
String.valueOf(quantity)
);
return result != null && result >= 0;
}
/**
* 批量删除匹配的 Key
*/
public long deleteByPattern(String pattern) {
String script =
"local keys = redis.call('keys', ARGV[1]) " +
"local count = 0 " +
"for i, key in ipairs(keys) do " +
" redis.call('del', key) " +
" count = count + 1 " +
"end " +
"return count";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.emptyList(),
pattern
);
return result != null ? result : 0;
}
/**
* 使用脚本缓存(EVALSHA)
*/
private String cachedScriptSha;
public void initScriptCache() {
String script = "return redis.call('get', KEYS[1])";
cachedScriptSha = redisTemplate.execute((RedisCallback<String>) connection ->
connection.scriptLoad(script.getBytes())
);
}
public String evalCachedScript(String key) {
return redisTemplate.execute((RedisCallback<String>) connection -> {
byte[] result = connection.evalSha(
cachedScriptSha,
ReturnType.VALUE,
1,
key.getBytes()
);
return result != null ? new String(result) : null;
});
}
}
五、其他高级特性
5.1 Pipeline(管道)
@Service
public class PipelineService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 批量操作使用 Pipeline
*/
public void batchSetWithPipeline(Map<String, String> data) {
redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public Object execute(RedisOperations operations) {
data.forEach((key, value) ->
operations.opsForValue().set(key, value)
);
return null;
}
});
}
/**
* 批量获取使用 Pipeline
*/
public List<Object> batchGetWithPipeline(List<String> keys) {
return redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public Object execute(RedisOperations operations) {
keys.forEach(key -> operations.opsForValue().get(key));
return null;
}
});
}
}
5.2 发布订阅(Pub/Sub)
// 消息监听器
@Component
public class RedisMessageListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel());
String body = new String(message.getBody());
System.out.println("收到消息 - 频道: " + channel + ", 内容: " + body);
}
}
// 配置订阅
@Configuration
public class RedisPubSubConfig {
@Bean
public RedisMessageListenerContainer container(
RedisConnectionFactory factory,
RedisMessageListener listener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(listener, new PatternTopic("news.*"));
return container;
}
}
// 发布消息
@Service
public class MessagePublisher {
@Autowired
private StringRedisTemplate redisTemplate;
public void publish(String channel, String message) {
redisTemplate.convertAndSend(channel, message);
}
}
5.3 Stream(消息队列 - Redis 5.0+)
@Service
public class RedisStreamService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 发送消息到 Stream
*/
public String sendMessage(String streamKey, Map<String, String> message) {
RecordId recordId = redisTemplate.opsForStream()
.add(streamKey, message);
return recordId.toString();
}
/**
* 创建消费者组
*/
public void createConsumerGroup(String streamKey, String groupName) {
try {
redisTemplate.opsForStream()
.createGroup(streamKey, groupName);
} catch (Exception e) {
// 组已存在
}
}
/**
* 消费消息
*/
public List<MapRecord<String, Object, Object>> consumeMessages(
String streamKey, String groupName, String consumerName, int count) {
return redisTemplate.opsForStream().read(
Consumer.from(groupName, consumerName),
StreamReadOptions.empty().count(count).block(Duration.ofSeconds(2)),
StreamOffset.create(streamKey, ReadOffset.lastConsumed())
);
}
/**
* 确认消息
*/
public void ackMessage(String streamKey, String groupName, String recordId) {
redisTemplate.opsForStream()
.acknowledge(streamKey, groupName, recordId);
}
}
六、总结
功能选择建议
分布式锁对比
| 方案 | 实现复杂度 | 可靠性 | 性能 | 推荐场景 |
|---|---|---|---|---|
| SETNX | 低 | 中 | 高 | 简单场景 |
| Redisson | 低 | 高 | 高 | 大多数场景 |
| RedLock | 高 | 较高 | 中 | 极高可靠性 |
| Zookeeper | 高 | 最高 | 低 | 金融级场景 |