当前位置:首页 > 问答 > 正文

分布式锁|高并发环境下Redis实现的锁机制控制与redis锁控制应用

Redis分布式锁实战指南

场景引入:电商秒杀的锁之痛

"小王,咱们的秒杀系统又崩了!"凌晨2点,运维同事的电话把王工程师从睡梦中惊醒,打开监控系统,他发现同一个iPhone15商品被超卖了37台——这已经是本月第三次出现库存超卖问题了。

在电商、金融支付等对数据一致性要求极高的场景中,传统的单机锁在分布式环境下完全失效,想象一下,当1000个用户同时点击"立即抢购",如何确保库存只减少1000而不是更多?这就是分布式锁要解决的核心问题。

Redis分布式锁的本质

Redis之所以成为分布式锁的首选,得益于三个特性:

  1. 单线程执行:命令串行处理,天然避免并发问题
  2. 高性能:每秒10万+的QPS足以应对大多数高并发场景
  3. 原子操作:SETNX、Lua脚本等特性完美契合锁需求

但要注意,Redis锁不是银弹,我在2023年参与的一个社交APP项目中,就曾因错误使用Redis锁导致整个私信系统雪崩,这提醒我们:技术选型必须匹配业务场景。

手把手实现Redis锁

基础版:SETNX实现

import redis
import time
class RedisLock:
    def __init__(self, redis_client):
        self.redis = redis_client
    def acquire(self, lock_key, expire=30):
        """
        获取分布式锁
        :param lock_key: 锁键名
        :param expire: 锁过期时间(秒)
        :return: 是否获取成功
        """
        identifier = str(time.time())  # 唯一标识
        if self.redis.setnx(lock_key, identifier):
            self.redis.expire(lock_key, expire)
            return identifier
        return False
    def release(self, lock_key, identifier):
        """
        释放锁
        :param lock_key: 锁键名
        :param identifier: 获取锁时返回的标识
        """
        # 确保不会误删其他客户端的锁
        if self.redis.get(lock_key) == identifier:
            self.redis.delete(lock_key)

这个基础版本存在明显缺陷——获取锁和设置过期时间不是原子操作,如果在setnx之后程序崩溃,会导致死锁。

分布式锁|高并发环境下Redis实现的锁机制控制与redis锁控制应用

进阶版:原子操作优化

Redis 2.6.12后支持SET命令扩展参数:

def safe_acquire(self, lock_key, expire=30):
    identifier = str(time.time())
    # 原子性设置键值+过期时间
    result = self.redis.set(
        lock_key, 
        identifier, 
        nx=True,  # 不存在时才设置
        ex=expire # 设置过期时间
    )
    return identifier if result else False

高可用方案:Redlock算法

当需要更高可靠性时,可以采用Redis官方推荐的Redlock算法,该算法需要至少5个独立的Redis主节点:

  1. 获取当前毫秒级时间戳T1
  2. 依次向所有节点请求加锁(相同key和随机值)
  3. 当从多数节点(N/2+1)获取锁成功,且总耗时小于锁有效期时才算成功
  4. 锁的实际有效时间 = 初始有效时间 - 获取锁总耗时
import random
class RedLock:
    def __init__(self, redis_nodes):
        self.quorum = len(redis_nodes) // 2 + 1
        self.nodes = redis_nodes
    def lock(self, resource, ttl):
        identifier = str(random.random())
        start_time = time.time() * 1000
        locked_nodes = 0
        for node in self.nodes:
            try:
                if node.set(resource, identifier, nx=True, px=ttl):
                    locked_nodes += 1
            except:
                continue
        elapsed = time.time() * 1000 - start_time
        if locked_nodes >= self.quorum and elapsed < ttl:
            return {
                'validity': ttl - elapsed,
                'resource': resource,
                'identifier': identifier
            }
        # 失败时释放已获得的锁
        self.unlock({'resource': resource, 'identifier': identifier})
        return False

生产环境中的避坑指南

锁过期时间设置

去年我们系统出现过这样的情况:某个订单处理任务耗时超过锁过期时间,导致锁自动释放后,另一个进程获取到锁并开始处理同一订单,最终解决方案:

# 动态续期示例
def renew_lock(lock_key, identifier, expire=30):
    while True:
        if redis.get(lock_key) == identifier:
            redis.expire(lock_key, expire)
            time.sleep(expire * 0.8)  # 在过期前续期
        else:
            break

建议设置合理的超时时间,通常为业务平均耗时的2-3倍。

分布式锁|高并发环境下Redis实现的锁机制控制与redis锁控制应用

锁等待策略

直接失败还是阻塞等待?不同场景需要不同策略:

# 非阻塞式
if not lock.acquire("order_123"):
    return "系统繁忙,请稍后再试"
# 阻塞式(带超时)
start = time.time()
timeout = 5  # 秒
while time.time() - start < timeout:
    if lock.acquire("order_123"):
        break
    time.sleep(0.1)
else:
    raise Exception("获取锁超时")

锁粒度控制

过粗的锁会导致性能下降,在某内容审核系统中,我们最初使用全局锁,QPS只能到200,优化后按内容ID加锁,QPS提升至5000+:

# 不推荐 - 全局锁
lock_key = "content_review_lock"
# 推荐 - 细粒度锁
lock_key = f"content_review_lock_{content_id}"

典型应用场景解析

案例1:库存扣减

def deduct_stock(item_id, count):
    lock_key = f"inventory_lock_{item_id}"
    identifier = lock.acquire(lock_key)
    if not identifier:
        raise Exception("操作过于频繁")
    try:
        stock = db.query("SELECT stock FROM inventory WHERE item_id = %s", item_id)
        if stock >= count:
            db.execute("UPDATE inventory SET stock = stock - %s WHERE item_id = %s", 
                       count, item_id)
    finally:
        lock.release(lock_key, identifier)

案例2:定时任务防重复执行

def scheduled_task():
    lock_key = "task_lock_clean_expired_data"
    if not lock.acquire(lock_key, expire=3600):
        print("已有其他节点在执行该任务")
        return
    try:
        # 执行清理逻辑
        clean_expired_data()
    finally:
        # 注意:长时间任务可能需要手动释放
        lock.release(lock_key, identifier)

Redis锁的局限性

在以下场景可能需要考虑其他方案:

  1. 长时间任务:锁过期时间难以预估
  2. 严格一致性要求:Redis异步复制可能导致锁状态不一致
  3. 超高并发:Redlock算法性能开销较大

去年我们就在一个银行结算系统中用Zookeeper替代了Redis锁,因为金融场景对一致性的要求高于性能。

分布式锁|高并发环境下Redis实现的锁机制控制与redis锁控制应用

没有完美的锁,只有合适的锁

Redis分布式锁就像交通信号灯——不是要完全阻止车辆通行,而是让通行变得有序,经过多次实战,我总结了Redis锁的选用原则:

  1. 先评估业务场景的并发量和一致性要求
  2. 简单场景用单Redis节点+SET NX EX
  3. 高可用场景用Redlock
  4. 极端场景考虑Zookeeper/etcd等强一致性方案

分布式系统的复杂性往往超出预期,建议你在上线前做好:

  • 压力测试(模拟锁竞争)
  • 混沌工程(模拟节点宕机)
  • 详细监控(锁等待时间、获取失败率等指标)

技术没有最好,只有最合适,希望这篇来自2025年的经验总结,能帮助你在分布式锁的迷雾中找到方向。

发表评论