Redis

Redis 核心知识笔记 - 第五部分:扩展功能

Redis 核心知识笔记 - 第五部分:扩展功能

一、Redis 事务

1.1 事务基础

Redis 事务通过 MULTIEXECDISCARDWATCH 命令实现。

1.2 基本命令

# 开启事务
MULTI

# 命令入队
SET user:1001:name "张三"
SET user:1001:age 25
INCR user:count

# 执行事务
EXEC

# 取消事务
DISCARD

# 监听 Key(乐观锁)
WATCH key [key ...]

# 取消监听
UNWATCH

1.3 事务特性

特性描述
原子性事务中的命令要么全部执行,要么全部不执行(语法错误时)
隔离性事务执行期间不会被其他客户端打断
无回滚运行时错误不会回滚已执行的命令

Warning

Redis 事务与传统 ACID 事务的区别

  • Redis 事务不支持回滚(运行时错误不会回滚)
  • Redis 事务不保证持久性(取决于持久化配置)

1.4 事务错误处理

示例:语法错误

MULTI
SET key1 value1
SETX key2 value2    # 错误命令
SET key3 value3
EXEC                # 整个事务被取消

示例:运行时错误

MULTI
SET key1 value1
INCR key1           # 运行时错误:key1 不是数字
SET key3 value3
EXEC                # key1 和 key3 设置成功,INCR 返回错误

1.5 WATCH 乐观锁

1.6 Java 事务示例

@Service
public class RedisTransactionService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
     * 基本事务操作
     */
    public void basicTransaction() {
        // 开启事务
        redisTemplate.setEnableTransactionSupport(true);
        
        redisTemplate.execute(new SessionCallback<List<Object>>() {
            @Override
            public List<Object> execute(RedisOperations operations) {
                operations.multi();
                
                operations.opsForValue().set("user:1001:name", "张三");
                operations.opsForValue().set("user:1001:age", "25");
                operations.opsForValue().increment("user:count");
                
                return operations.exec();
            }
        });
    }
    
    /**
     * 带 WATCH 的乐观锁事务
     * 示例:账户转账
     */
    public boolean transfer(String fromAccount, String toAccount, int amount) {
        String fromKey = "account:" + fromAccount;
        String toKey = "account:" + toAccount;
        
        return Boolean.TRUE.equals(redisTemplate.execute(new SessionCallback<Boolean>() {
            @Override
            public Boolean execute(RedisOperations operations) {
                // 监听转出账户
                operations.watch(fromKey);
                
                // 获取余额
                String balanceStr = (String) operations.opsForValue().get(fromKey);
                int balance = Integer.parseInt(balanceStr);
                
                if (balance < amount) {
                    operations.unwatch();
                    return false; // 余额不足
                }
                
                // 开启事务
                operations.multi();
                operations.opsForValue().decrement(fromKey, amount);
                operations.opsForValue().increment(toKey, amount);
                
                // 执行事务
                List<Object> results = operations.exec();
                return results != null && !results.isEmpty();
            }
        }));
    }
    
    /**
     * 使用 Lua 脚本实现原子操作(推荐)
     */
    public boolean transferWithLua(String fromAccount, String toAccount, int amount) {
        String script = 
            "local from = KEYS[1] " +
            "local to = KEYS[2] " +
            "local amount = tonumber(ARGV[1]) " +
            "local balance = tonumber(redis.call('get', from) or 0) " +
            "if balance >= amount then " +
            "    redis.call('decrby', from, amount) " +
            "    redis.call('incrby', to, amount) " +
            "    return 1 " +
            "else " +
            "    return 0 " +
            "end";
        
        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(script, Long.class),
            Arrays.asList("account:" + fromAccount, "account:" + toAccount),
            String.valueOf(amount)
        );
        
        return result != null && result == 1;
    }
}

二、分布式锁

2.1 分布式锁需求

2.2 基础实现(SETNX)

@Service
public class SimpleDistributedLock {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    private static final String LOCK_PREFIX = "lock:";
    
    /**
     * 获取锁
     * @param lockKey 锁名称
     * @param requestId 请求标识(用于释放锁时验证)
     * @param expireTime 过期时间(秒)
     */
    public boolean tryLock(String lockKey, String requestId, long expireTime) {
        String key = LOCK_PREFIX + lockKey;
        Boolean result = redisTemplate.opsForValue()
            .setIfAbsent(key, requestId, expireTime, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(result);
    }
    
    /**
     * 释放锁(使用 Lua 脚本保证原子性)
     */
    public boolean unlock(String lockKey, String requestId) {
        String key = LOCK_PREFIX + lockKey;
        
        // Lua 脚本:只有持有者才能释放锁
        String script = 
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "    return redis.call('del', KEYS[1]) " +
            "else " +
            "    return 0 " +
            "end";
        
        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(script, Long.class),
            Collections.singletonList(key),
            requestId
        );
        
        return result != null && result == 1;
    }
    
    /**
     * 使用示例
     */
    public void doBusinessWithLock(String orderId) {
        String lockKey = "order:" + orderId;
        String requestId = UUID.randomUUID().toString();
        
        try {
            // 尝试获取锁,最多等待 3 秒
            int retryCount = 0;
            while (!tryLock(lockKey, requestId, 30)) {
                if (++retryCount > 30) {
                    throw new RuntimeException("获取锁超时");
                }
                Thread.sleep(100);
            }
            
            // 执行业务逻辑
            processOrder(orderId);
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            // 释放锁
            unlock(lockKey, requestId);
        }
    }
}

2.3 可重入锁实现

@Service
public class ReentrantDistributedLock {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    private static final String LOCK_PREFIX = "lock:";
    
    // 使用 ThreadLocal 存储重入次数
    private ThreadLocal<Map<String, Integer>> lockCountMap = 
        ThreadLocal.withInitial(HashMap::new);
    
    /**
     * 获取可重入锁
     */
    public boolean tryLock(String lockKey, String requestId, long expireTime) {
        String key = LOCK_PREFIX + lockKey;
        Map<String, Integer> countMap = lockCountMap.get();
        
        // 检查是否已持有锁(重入)
        if (countMap.containsKey(key)) {
            countMap.put(key, countMap.get(key) + 1);
            return true;
        }
        
        // 尝试获取锁
        Boolean result = redisTemplate.opsForValue()
            .setIfAbsent(key, requestId, expireTime, TimeUnit.SECONDS);
        
        if (Boolean.TRUE.equals(result)) {
            countMap.put(key, 1);
            return true;
        }
        
        return false;
    }
    
    /**
     * 释放可重入锁
     */
    public boolean unlock(String lockKey, String requestId) {
        String key = LOCK_PREFIX + lockKey;
        Map<String, Integer> countMap = lockCountMap.get();
        
        if (!countMap.containsKey(key)) {
            return false;
        }
        
        int count = countMap.get(key) - 1;
        if (count > 0) {
            // 还有重入层级,不释放锁
            countMap.put(key, count);
            return true;
        }
        
        // 删除计数器
        countMap.remove(key);
        
        // 释放 Redis 锁
        String script = 
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "    return redis.call('del', KEYS[1]) " +
            "else " +
            "    return 0 " +
            "end";
        
        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(script, Long.class),
            Collections.singletonList(key),
            requestId
        );
        
        return result != null && result == 1;
    }
}

2.4 自动续期(看门狗)

@Service
public class WatchDogLock {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    private static final String LOCK_PREFIX = "lock:";
    private static final ScheduledExecutorService scheduler = 
        Executors.newScheduledThreadPool(5);
    
    // 存储续期任务
    private ConcurrentHashMap<String, ScheduledFuture<?>> renewalTasks = 
        new ConcurrentHashMap<>();
    
    /**
     * 获取锁并启动看门狗
     */
    public boolean tryLockWithWatchdog(String lockKey, String requestId, 
            long expireTime, TimeUnit unit) {
        String key = LOCK_PREFIX + lockKey;
        
        Boolean result = redisTemplate.opsForValue()
            .setIfAbsent(key, requestId, expireTime, unit);
        
        if (Boolean.TRUE.equals(result)) {
            // 启动看门狗续期
            startWatchdog(key, requestId, expireTime, unit);
            return true;
        }
        
        return false;
    }
    
    /**
     * 启动看门狗
     */
    private void startWatchdog(String key, String requestId, 
            long expireTime, TimeUnit unit) {
        long expireMs = unit.toMillis(expireTime);
        long renewalInterval = expireMs / 3; // 每 1/3 过期时间续期一次
        
        ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
            String script = 
                "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                "    return redis.call('pexpire', KEYS[1], ARGV[2]) " +
                "else " +
                "    return 0 " +
                "end";
            
            Long result = redisTemplate.execute(
                new DefaultRedisScript<>(script, Long.class),
                Collections.singletonList(key),
                requestId, String.valueOf(expireMs)
            );
            
            if (result == null || result == 0) {
                // 锁已失效,停止续期
                stopWatchdog(key);
            }
        }, renewalInterval, renewalInterval, TimeUnit.MILLISECONDS);
        
        renewalTasks.put(key, future);
    }
    
    /**
     * 停止看门狗
     */
    private void stopWatchdog(String key) {
        ScheduledFuture<?> future = renewalTasks.remove(key);
        if (future != null) {
            future.cancel(false);
        }
    }
    
    /**
     * 释放锁并停止看门狗
     */
    public boolean unlock(String lockKey, String requestId) {
        String key = LOCK_PREFIX + lockKey;
        
        // 停止看门狗
        stopWatchdog(key);
        
        // 释放锁
        String script = 
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "    return redis.call('del', KEYS[1]) " +
            "else " +
            "    return 0 " +
            "end";
        
        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(script, Long.class),
            Collections.singletonList(key),
            requestId
        );
        
        return result != null && result == 1;
    }
}

2.5 使用 Redisson(推荐)

Redisson 是 Redis 的 Java 客户端,提供了完善的分布式锁实现。

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.23.5</version>
</dependency>
@Configuration
public class RedissonConfig {
    
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer()
            .setAddress("redis://127.0.0.1:6379")
            .setPassword("password")
            .setDatabase(0);
        return Redisson.create(config);
    }
}

@Service
public class RedissonLockService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    /**
     * 基本使用
     */
    public void doWithLock(String lockName) {
        RLock lock = redissonClient.getLock("lock:" + lockName);
        
        try {
            // 尝试获取锁,等待10秒,锁持有30秒
            boolean acquired = lock.tryLock(10, 30, TimeUnit.SECONDS);
            
            if (acquired) {
                // 执行业务逻辑
                doSomething();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            // 只有持有锁的线程才能释放
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
    
    /**
     * 看门狗自动续期
     * 不设置 leaseTime,Redisson 会自动续期
     */
    public void doWithWatchdog(String lockName) {
        RLock lock = redissonClient.getLock("lock:" + lockName);
        
        try {
            // 不设置 leaseTime,看门狗自动续期(默认30秒,每10秒续期)
            lock.lock();
            
            doSomething();
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * 公平锁(按请求顺序获取锁)
     */
    public void doWithFairLock(String lockName) {
        RLock fairLock = redissonClient.getFairLock("lock:" + lockName);
        
        try {
            fairLock.lock();
            doSomething();
        } finally {
            fairLock.unlock();
        }
    }
    
    /**
     * 读写锁
     */
    public void readWriteLock(String lockName) {
        RReadWriteLock rwLock = redissonClient.getReadWriteLock("rwLock:" + lockName);
        
        // 读锁(共享)
        RLock readLock = rwLock.readLock();
        try {
            readLock.lock();
            // 读取操作
        } finally {
            readLock.unlock();
        }
        
        // 写锁(排他)
        RLock writeLock = rwLock.writeLock();
        try {
            writeLock.lock();
            // 写入操作
        } finally {
            writeLock.unlock();
        }
    }
    
    /**
     * 信号量
     */
    public void semaphoreExample() {
        RSemaphore semaphore = redissonClient.getSemaphore("semaphore:parking");
        semaphore.trySetPermits(100); // 设置100个许可
        
        try {
            // 获取许可
            semaphore.acquire();
            // 执行操作
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            // 释放许可
            semaphore.release();
        }
    }
    
    /**
     * 闭锁(CountDownLatch)
     */
    public void countDownLatchExample() throws InterruptedException {
        RCountDownLatch latch = redissonClient.getCountDownLatch("latch:task");
        latch.trySetCount(5); // 设置计数为5
        
        // 等待计数归零
        latch.await();
        
        // 其他线程调用 countDown
        latch.countDown();
    }
}

三、RedLock(红锁)

3.1 为什么需要 RedLock

单 Redis 实例的分布式锁存在单点故障问题:

3.2 RedLock 算法

核心思想:在多个独立的 Redis 实例上获取锁,只有在多数实例上获取成功才算成功。

3.3 RedLock 算法步骤

3.4 Redisson RedLock 实现

@Configuration
public class RedLockConfig {
    
    @Bean
    public RedissonClient redissonClient1() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.1.101:6379");
        return Redisson.create(config);
    }
    
    @Bean
    public RedissonClient redissonClient2() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.1.102:6379");
        return Redisson.create(config);
    }
    
    @Bean
    public RedissonClient redissonClient3() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.1.103:6379");
        return Redisson.create(config);
    }
}

@Service
public class RedLockService {
    
    @Autowired
    private RedissonClient redissonClient1;
    
    @Autowired
    private RedissonClient redissonClient2;
    
    @Autowired
    private RedissonClient redissonClient3;
    
    /**
     * 使用 RedLock
     */
    public void doWithRedLock(String lockName) {
        // 从不同 Redis 实例获取锁对象
        RLock lock1 = redissonClient1.getLock("lock:" + lockName);
        RLock lock2 = redissonClient2.getLock("lock:" + lockName);
        RLock lock3 = redissonClient3.getLock("lock:" + lockName);
        
        // 创建 RedLock
        RLock redLock = redissonClient1.getMultiLock(lock1, lock2, lock3);
        
        try {
            // 尝试获取锁
            boolean acquired = redLock.tryLock(10, 30, TimeUnit.SECONDS);
            
            if (acquired) {
                // 执行业务逻辑
                doSomething();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            // 释放锁
            redLock.unlock();
        }
    }
    
    /**
     * 红锁实现原理演示
     */
    public boolean tryRedLock(String lockName, String requestId, 
            long ttlMs, int quorum) {
        List<RedissonClient> clients = Arrays.asList(
            redissonClient1, redissonClient2, redissonClient3
        );
        
        long startTime = System.currentTimeMillis();
        int successCount = 0;
        List<RLock> acquiredLocks = new ArrayList<>();
        
        // 依次尝试获取锁
        for (RedissonClient client : clients) {
            RLock lock = client.getLock("lock:" + lockName);
            try {
                boolean acquired = lock.tryLock(
                    ttlMs / clients.size(), // 单个实例超时
                    ttlMs, 
                    TimeUnit.MILLISECONDS
                );
                if (acquired) {
                    successCount++;
                    acquiredLocks.add(lock);
                }
            } catch (Exception e) {
                // 单个实例失败不影响整体
            }
        }
        
        // 计算总耗时
        long elapsedTime = System.currentTimeMillis() - startTime;
        long validTime = ttlMs - elapsedTime;
        
        // 判断是否获取成功
        if (successCount >= quorum && validTime > 0) {
            return true;
        } else {
            // 释放所有已获取的锁
            for (RLock lock : acquiredLocks) {
                try {
                    lock.unlock();
                } catch (Exception e) {
                    // 忽略
                }
            }
            return false;
        }
    }
}

3.5 RedLock 争议

Caution

Martin Kleppmann 的质疑

  1. 时钟跳跃问题:系统时钟不同步可能导致锁提前过期
  2. GC 暂停问题:长时间 GC 可能导致锁过期后仍在执行
  3. 网络延迟问题:获取锁后,网络延迟可能消耗大量有效时间

建议

  • 对于大多数场景,单 Redis 实例 + 主从 + 哨兵已足够
  • 如果需要极高可靠性,考虑使用 Zookeeper 或 etcd

四、Lua 脚本

4.1 为什么使用 Lua 脚本

  • 原子性:整个脚本作为一个原子操作执行
  • 减少网络开销:多个命令合并为一次请求
  • 复用性:脚本可以缓存在服务端

4.2 基本语法

-- 获取参数
-- KEYS[n]:键名参数
-- ARGV[n]:附加参数

-- 调用 Redis 命令
redis.call('SET', KEYS[1], ARGV[1])
local value = redis.call('GET', KEYS[1])

-- 返回值
return value

4.3 常用 Lua 脚本示例

@Service
public class LuaScriptService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
     * 原子性检查并设置(CAS)
     */
    public boolean compareAndSet(String key, String expect, String update) {
        String script = 
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "    redis.call('set', KEYS[1], ARGV[2]) " +
            "    return 1 " +
            "else " +
            "    return 0 " +
            "end";
        
        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(script, Long.class),
            Collections.singletonList(key),
            expect, update
        );
        
        return result != null && result == 1;
    }
    
    /**
     * 限流:滑动窗口
     */
    public boolean isAllowed(String key, int maxRequests, int windowSeconds) {
        String script = 
            "local key = KEYS[1] " +
            "local max = tonumber(ARGV[1]) " +
            "local window = tonumber(ARGV[2]) " +
            "local now = tonumber(ARGV[3]) " +
            "local window_start = now - window * 1000 " +
            
            // 移除窗口外的请求
            "redis.call('ZREMRANGEBYSCORE', key, 0, window_start) " +
            
            // 获取当前窗口内的请求数
            "local count = redis.call('ZCARD', key) " +
            
            "if count < max then " +
            // 添加当前请求
            "    redis.call('ZADD', key, now, now) " +
            "    redis.call('EXPIRE', key, window) " +
            "    return 1 " +
            "else " +
            "    return 0 " +
            "end";
        
        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(script, Long.class),
            Collections.singletonList(key),
            String.valueOf(maxRequests),
            String.valueOf(windowSeconds),
            String.valueOf(System.currentTimeMillis())
        );
        
        return result != null && result == 1;
    }
    
    /**
     * 库存扣减
     */
    public boolean decrStock(String productId, int quantity) {
        String script = 
            "local stock = tonumber(redis.call('get', KEYS[1]) or 0) " +
            "local quantity = tonumber(ARGV[1]) " +
            "if stock >= quantity then " +
            "    redis.call('decrby', KEYS[1], quantity) " +
            "    return stock - quantity " +
            "else " +
            "    return -1 " +
            "end";
        
        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(script, Long.class),
            Collections.singletonList("stock:" + productId),
            String.valueOf(quantity)
        );
        
        return result != null && result >= 0;
    }
    
    /**
     * 批量删除匹配的 Key
     */
    public long deleteByPattern(String pattern) {
        String script = 
            "local keys = redis.call('keys', ARGV[1]) " +
            "local count = 0 " +
            "for i, key in ipairs(keys) do " +
            "    redis.call('del', key) " +
            "    count = count + 1 " +
            "end " +
            "return count";
        
        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(script, Long.class),
            Collections.emptyList(),
            pattern
        );
        
        return result != null ? result : 0;
    }
    
    /**
     * 使用脚本缓存(EVALSHA)
     */
    private String cachedScriptSha;
    
    public void initScriptCache() {
        String script = "return redis.call('get', KEYS[1])";
        cachedScriptSha = redisTemplate.execute((RedisCallback<String>) connection -> 
            connection.scriptLoad(script.getBytes())
        );
    }
    
    public String evalCachedScript(String key) {
        return redisTemplate.execute((RedisCallback<String>) connection -> {
            byte[] result = connection.evalSha(
                cachedScriptSha,
                ReturnType.VALUE,
                1,
                key.getBytes()
            );
            return result != null ? new String(result) : null;
        });
    }
}

五、其他高级特性

5.1 Pipeline(管道)

@Service
public class PipelineService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
     * 批量操作使用 Pipeline
     */
    public void batchSetWithPipeline(Map<String, String> data) {
        redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public Object execute(RedisOperations operations) {
                data.forEach((key, value) -> 
                    operations.opsForValue().set(key, value)
                );
                return null;
            }
        });
    }
    
    /**
     * 批量获取使用 Pipeline
     */
    public List<Object> batchGetWithPipeline(List<String> keys) {
        return redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public Object execute(RedisOperations operations) {
                keys.forEach(key -> operations.opsForValue().get(key));
                return null;
            }
        });
    }
}

5.2 发布订阅(Pub/Sub)

// 消息监听器
@Component
public class RedisMessageListener implements MessageListener {
    
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel());
        String body = new String(message.getBody());
        System.out.println("收到消息 - 频道: " + channel + ", 内容: " + body);
    }
}

// 配置订阅
@Configuration
public class RedisPubSubConfig {
    
    @Bean
    public RedisMessageListenerContainer container(
            RedisConnectionFactory factory,
            RedisMessageListener listener) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        container.addMessageListener(listener, new PatternTopic("news.*"));
        return container;
    }
}

// 发布消息
@Service
public class MessagePublisher {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    public void publish(String channel, String message) {
        redisTemplate.convertAndSend(channel, message);
    }
}

5.3 Stream(消息队列 - Redis 5.0+)

@Service
public class RedisStreamService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
     * 发送消息到 Stream
     */
    public String sendMessage(String streamKey, Map<String, String> message) {
        RecordId recordId = redisTemplate.opsForStream()
            .add(streamKey, message);
        return recordId.toString();
    }
    
    /**
     * 创建消费者组
     */
    public void createConsumerGroup(String streamKey, String groupName) {
        try {
            redisTemplate.opsForStream()
                .createGroup(streamKey, groupName);
        } catch (Exception e) {
            // 组已存在
        }
    }
    
    /**
     * 消费消息
     */
    public List<MapRecord<String, Object, Object>> consumeMessages(
            String streamKey, String groupName, String consumerName, int count) {
        
        return redisTemplate.opsForStream().read(
            Consumer.from(groupName, consumerName),
            StreamReadOptions.empty().count(count).block(Duration.ofSeconds(2)),
            StreamOffset.create(streamKey, ReadOffset.lastConsumed())
        );
    }
    
    /**
     * 确认消息
     */
    public void ackMessage(String streamKey, String groupName, String recordId) {
        redisTemplate.opsForStream()
            .acknowledge(streamKey, groupName, recordId);
    }
}

六、总结

功能选择建议

分布式锁对比

方案实现复杂度可靠性性能推荐场景
SETNX简单场景
Redisson大多数场景
RedLock较高极高可靠性
Zookeeper最高金融级场景