使用Redis构建分布式锁

在软件开发中,锁是常见的一种业务处理方式,用途也很广泛。锁给我们提供了一种能够被多个线程访问的共享内存数据结构,在使用锁时,通常有着以下步骤“获取锁、然后执行操作、最后释放锁”。

最初的锁实现中,是提供给同一进程内的多个线程进行共享访问的,因为最初往往程序都是单机部署,而随着互联网技术的发展,分布式部署已成了主流,同一应用往往部署在多个不同的服务器中。

能让分布在不同服务器的进程同时访问的锁,称为分布式锁。分布式锁有多种实现方式,这里只介绍其中的一种:利用Redis实现分布式锁。

简易锁

大部分Redis用户对锁(lock)、加锁(locking)、锁超时(lock timeout)有所了解,但确不一定能够正确使用,都是因为未能正确考虑以下几种故障和异常情况:

  • 持有锁的进程因为操作时间过长而导致锁被自动释放,但进程本身并不知道,那么当该线程对锁进行释放时就有可能会将其他进程持有的锁进行释放;
  • 当前持有锁的进程在进行长时间操作时崩溃,导致锁未能正常释放,而其他想要获取锁的线程并不知道哪个线程持有锁,也无法知晓持有锁的线程已经崩溃,因此只能等待锁过期释放;
  • 当一个进程持有的锁过期后,多个线程同时尝试去获取锁,且都获取了锁;
  • 上述第一、第三种情况同时出现,导致多个线程获得了锁,而每个线程都以为自己是唯一获得锁的线程;

如今稍微好点硬件能够让Redis在服务器上每秒执行几十万次操作,虽说上述情况出现的几率较小,但在高负载时出现的几率会大大提高,因此需要编写能正确工作的锁。

正确的构建锁

实际上只要稍微注意下,写出一个能够正确工作的Redis锁并不是什么难事。因为大部分情况下,一般的实现都能正常工作。

在redis的函数中,setnx就特别适合来实现锁的功能,它只有在Key不存在时才会设置当前内容。如果key已经存在则返回失败。

来看下面的伪代码:

public string AcquireLock(locKName, acquireTimeout){
    锁标识lockid = GUID
    endtime = now + acquireTimeout
    while(now < endtime){
        if setnx(lockName, lockid)
            return lockid
        
        sleep(0.01)
    }

    return string.empty
}

代码很容易理解:首先使用GUID来唯一标识当前锁,然后在有效时间内使用while循环尝试去获取锁,如果setnx返回成功,则说明当前锁设置成功,返回当前GUID供后续释放锁使用。

如果setnx失败,则短暂休眠后继续尝试获取锁,直到成功或者超时。

你可能注意到了,当前方式获取的锁是不带过期时间的,也就是说一旦第一个线程设置锁的时候,后续的其他获取锁的操作都没法成功。这显然不是我们想要的,但这种锁也有它适合应用的场景,比如笔者常用的取某些序列号的场景,当某一类型的序列值在redis存在时,对其increment后返回,而如果不存在则设置第一个初始化的值,设置之后该值永远有效。

回到我们使用线程锁的场景,要完善上述方法,只需从两个方面考虑:

* 每次获取锁,完成业务操作后,手动释放锁
* 给锁加上过期时间

下面分别来讲解两种方式。

释放锁

开头我们已经说过,常见业务是“获取锁、业务操作、释放锁”的过程。因为如果获取锁的线程在业务操作处理完成之后不手动对锁进行释放的话,可能因此其他线程无效的等待或者说永远等待。因此释放锁的操作也显得很重要。

释放锁需要注意的是,当前线程只能释放自己获取的锁,不能释放掉其他线程的锁。也许有人会说自己时唯一获取锁的线程,怎么可能会释放掉其他线程的锁呢?

实际上这有两个条件:

  • 当自身线程获取锁时的确是唯一获取锁的线程,但是因为锁的过期时间短,而业务操作时间长,因此导致了当你释放时该锁已经被redis自动释放,而其他线程获取了锁;
  • 可能并不是只有当前自身线程获取了锁,而是有多个线程同时获取了锁

因此在获取锁时,返回的guid表示当前锁的唯一标识便显得很有必要了。

来看伪代码:

public bool ReleaseLock(lockName, lockid){
    try{
        watch(lockname)
        if get(lockname) == lockid
            return del(lockname)
        unwatch(lockname)
    }
    catch{
        return false
    }
}

redis中使用watch来监视key的变化,在watch/unwatch块中的操作,如果key发生了变化,则会抛出异常,否则表示在当前块中的操作,key值都不会变化。因此实现了类似数据库事务的效果,保证在get和del中间key不会变化,因此不会释放掉其他线程的锁。

使用StackExchange.Redis实现的C#版本代码如下:

    /// <summary>
    /// 释放锁
    /// </summary>
    /// <param name="lockName">锁名称</param>
    /// <param name="lockId">锁当前的值</param>
    /// <returns></returns>
    public static bool ReleaseLock(string lockName, string lockId)
    {
        var newId = Guid.NewGuid().ToString();
        var tran = _db.CreateTransaction();
        tran.AddCondition(Condition.StringEqual(lockName, lockId));
        tran.KeyDeleteAsync(lockName);
        return tran.Execute();
    }

在StackExchange.Redis中,我们可以使用CreateTransaction实现事务功能,只有trans中AddCondition条件为true时,整个事务才会提交提交。

为了保证不会删除别的线程锁设置的锁,这里使用Condition.StringEqual(lockName, lockId)来判断。

带超时限制的锁

由于上面不带超时的锁,一旦设置后其余线程都无法获取锁,在业务上很明显不合适。 所以我们给每次获取的锁设置超时时间,保证即使当前获取锁的线程未能手动释放锁(如异常终止)的情况下,锁仍然可以在一定时间内失效,保证后续业务正常执行。

实现起来很简单,就是在成功获取锁之后,直接设置一个过期时间即可。

伪代码如下:

public string AcquireLock(locKName, acquireTimeout, lockTimeout){
    锁标识lockid = GUID
    endtime = now + acquireTimeout
    while(now < endtime){
        if setnx(lockName, lockid){
            expire(lockName, lockTimeout)
            return lockid
        }
        
        sleep(0.01)
    }

    return string.empty
}

这样看起来好像已经很完美了,可以直接跑生产了。但是仔细一想,真的是这样吗?

我们来分析下代码:

如果程序在setnx成功后,在执行exipre之前出现异常终止,那么出现的情况就是锁设置成功了,但是没有超时时间!

因此,后续所有的获取锁的操作都无法成功,业务将会被阻断,这将是灾难性的bug。

改进版本

要解决上述的问题,就是在获取锁失败时,判断下锁是否有过期时间,如果没有,则设置它。这样就能保证锁在下一个周期段内一定能被释放,后续的操作能够正常执行。

伪代码:

public string AcquireLock(locKName, acquireTimeout, lockTimeout){
    锁标识lockid = GUID
    endtime = now + acquireTimeout
    while(now < endtime){
        if setnx(lockName, lockid){
            expire(lockName, lockTimeout)
            return lockid
        }else {
            if -1 == ttl(lockName) 
                expire(lockName, lockTimeout)
        }
        
        sleep(0.01)
    }

    return string.empty
}

ttl命令能够返回key的超时时间,这是一个正数。但当key不存在时返回-2,而key没有设置过期时间时返回-1。

使用StackExchange.Redis实现的C#版本代码如下:

    /// <summary>
    /// 获取锁
    /// </summary>
    /// <param name="lockName">锁名称</param>
    /// <param name="acquireTimeout">获取锁操作的超时时间</param>
    /// <param name="lockTimeout">设置锁的有效时间</param>
    /// <returns></returns>
    public static string AcquireLock(string lockName, int acquireTimeout, int lockTimeout)
    {
        string lockId = Guid.NewGuid().ToString();
        DateTime endTime = DateTime.Now.AddMilliseconds(acquireTimeout);
        while (DateTime.Now < endTime)
        {
            if (StringSetIfNotExist(lockName, lockId, TimeSpan.FromMilliseconds(lockTimeout)))
            {
                return lockId;
            }
            else
            {
                TimeSpan? ttl = _db.KeyTimeToLive(lockName);
                if (ttl == null)
                {
                    _db.KeyExpire(lockName, TimeSpan.FromMilliseconds(lockTimeout));
                    break;
                }
            }

            Thread.Sleep(1);
        }

        return string.Empty;
    }

在这里,我添加了方法StringSetIfNotExist来实现只有在key不存在时才设置的功能,这个的操作是使用StackExchange.Redis的事务功能实现的,其对应的Redis本质还是Watch命令等。

    /// <summary>
    /// 当key不存在时才设置,若存在返回false.
    /// </summary>
    /// <param name="key"></param>
    /// <param name="value"></param>
    /// <param name="expireTime"></param>
    /// <returns></returns>
    public static bool StringSetIfNotExist(string key, string value, TimeSpan expireTime)
    {
        var newId = Guid.NewGuid().ToString();
        var tran = _db.CreateTransaction();
        tran.AddCondition(Condition.KeyNotExists(key));
        tran.StringSetAsync(key, value, expireTime);
        return tran.Execute();
    }

总结

本篇主要讲述了如何使用redis实现分布式锁,开始说明了使用redis锁容易带来的几个问题,接着针对这几个问题分别介绍了如何实现简易锁、带超时的锁及如何安全的释放锁。

整个类文件:RedisHelper.cs

参考

  • 《Redis实战》Josiah L. Carlson 著,黄健宏 译.

标签: redis, , 分布式锁, StackExchange.Redis

添加新评论