"小王,咱们的秒杀系统又崩了!"凌晨2点,运维同事的电话把王工程师从睡梦中惊醒,打开监控系统,他发现同一个iPhone15商品被超卖了37台——这已经是本月第三次出现库存超卖问题了。
在电商、金融支付等对数据一致性要求极高的场景中,传统的单机锁在分布式环境下完全失效,想象一下,当1000个用户同时点击"立即抢购",如何确保库存只减少1000而不是更多?这就是分布式锁要解决的核心问题。
Redis之所以成为分布式锁的首选,得益于三个特性:
但要注意,Redis锁不是银弹,我在2023年参与的一个社交APP项目中,就曾因错误使用Redis锁导致整个私信系统雪崩,这提醒我们:技术选型必须匹配业务场景。
import redis import time class RedisLock: def __init__(self, redis_client): self.redis = redis_client def acquire(self, lock_key, expire=30): """ 获取分布式锁 :param lock_key: 锁键名 :param expire: 锁过期时间(秒) :return: 是否获取成功 """ identifier = str(time.time()) # 唯一标识 if self.redis.setnx(lock_key, identifier): self.redis.expire(lock_key, expire) return identifier return False def release(self, lock_key, identifier): """ 释放锁 :param lock_key: 锁键名 :param identifier: 获取锁时返回的标识 """ # 确保不会误删其他客户端的锁 if self.redis.get(lock_key) == identifier: self.redis.delete(lock_key)
这个基础版本存在明显缺陷——获取锁和设置过期时间不是原子操作,如果在setnx之后程序崩溃,会导致死锁。
Redis 2.6.12后支持SET命令扩展参数:
def safe_acquire(self, lock_key, expire=30): identifier = str(time.time()) # 原子性设置键值+过期时间 result = self.redis.set( lock_key, identifier, nx=True, # 不存在时才设置 ex=expire # 设置过期时间 ) return identifier if result else False
当需要更高可靠性时,可以采用Redis官方推荐的Redlock算法,该算法需要至少5个独立的Redis主节点:
import random class RedLock: def __init__(self, redis_nodes): self.quorum = len(redis_nodes) // 2 + 1 self.nodes = redis_nodes def lock(self, resource, ttl): identifier = str(random.random()) start_time = time.time() * 1000 locked_nodes = 0 for node in self.nodes: try: if node.set(resource, identifier, nx=True, px=ttl): locked_nodes += 1 except: continue elapsed = time.time() * 1000 - start_time if locked_nodes >= self.quorum and elapsed < ttl: return { 'validity': ttl - elapsed, 'resource': resource, 'identifier': identifier } # 失败时释放已获得的锁 self.unlock({'resource': resource, 'identifier': identifier}) return False
去年我们系统出现过这样的情况:某个订单处理任务耗时超过锁过期时间,导致锁自动释放后,另一个进程获取到锁并开始处理同一订单,最终解决方案:
# 动态续期示例 def renew_lock(lock_key, identifier, expire=30): while True: if redis.get(lock_key) == identifier: redis.expire(lock_key, expire) time.sleep(expire * 0.8) # 在过期前续期 else: break
建议设置合理的超时时间,通常为业务平均耗时的2-3倍。
直接失败还是阻塞等待?不同场景需要不同策略:
# 非阻塞式 if not lock.acquire("order_123"): return "系统繁忙,请稍后再试" # 阻塞式(带超时) start = time.time() timeout = 5 # 秒 while time.time() - start < timeout: if lock.acquire("order_123"): break time.sleep(0.1) else: raise Exception("获取锁超时")
过粗的锁会导致性能下降,在某内容审核系统中,我们最初使用全局锁,QPS只能到200,优化后按内容ID加锁,QPS提升至5000+:
# 不推荐 - 全局锁 lock_key = "content_review_lock" # 推荐 - 细粒度锁 lock_key = f"content_review_lock_{content_id}"
def deduct_stock(item_id, count): lock_key = f"inventory_lock_{item_id}" identifier = lock.acquire(lock_key) if not identifier: raise Exception("操作过于频繁") try: stock = db.query("SELECT stock FROM inventory WHERE item_id = %s", item_id) if stock >= count: db.execute("UPDATE inventory SET stock = stock - %s WHERE item_id = %s", count, item_id) finally: lock.release(lock_key, identifier)
def scheduled_task(): lock_key = "task_lock_clean_expired_data" if not lock.acquire(lock_key, expire=3600): print("已有其他节点在执行该任务") return try: # 执行清理逻辑 clean_expired_data() finally: # 注意:长时间任务可能需要手动释放 lock.release(lock_key, identifier)
在以下场景可能需要考虑其他方案:
去年我们就在一个银行结算系统中用Zookeeper替代了Redis锁,因为金融场景对一致性的要求高于性能。
Redis分布式锁就像交通信号灯——不是要完全阻止车辆通行,而是让通行变得有序,经过多次实战,我总结了Redis锁的选用原则:
分布式系统的复杂性往往超出预期,建议你在上线前做好:
技术没有最好,只有最合适,希望这篇来自2025年的经验总结,能帮助你在分布式锁的迷雾中找到方向。
本文由 毋凝竹 于2025-07-31发表在【云服务器提供商】,文中图片由(毋凝竹)上传,本平台仅提供信息存储服务;作者观点、意见不代表本站立场,如有侵权,请联系我们删除;若有图片侵权,请您准备原始证明材料和公证书后联系我方删除!
本文链接:https://vps.7tqx.com/wenda/499324.html
发表评论