nginx 文件锁、自旋锁的实现


在上一篇博客 Linux 共享内存以及 nginx 中的实现的示例中,我们看到每次多个进程同时对共享内存中的 count 加一,导致每次运行结果都不一样,那么解决的方法就是对临界区加锁了,所以本文就来研究一下 nginx 中的几种加锁方式。

文件锁

文件锁的原理就是在磁盘上创建一个文件(操作系统创建一个文件描述符),然后多个进程竞争去获取这个文件的访问权限,因此同一时刻只有一个进程能够访问临界区。

可以看出,进程并不会真正在这个文件中写什么东西,我们只是想要一个文件描述符 FD 而已,因此 nginx 会在创建了文件后把这个文件删除,只留下文件描述符。

多个进程打开同一个文件,各个进程看到的文件描述 FD 值可能会不一样。例如文件 test.txt 在 进程1 中是 101, 而在进程2中是 201

使用文件锁举例

使用文件锁主要用到两个 libc 提供的结构体和函数。

  • struct flock; 提供一些锁的基本信息,比如读锁 F_RDLCK, 还是写锁 F_WRLCK
  • fcntl(): 对文件描述符进行操作的库函数。

那么如何用这个函数来实现锁的功能呢?

先看一个加锁的代码:

void mtx_file_lock(struct fdmutex *m)
{
    struct flock fl;
    memset(&fl, 0, sizeof(struct flock));

    fl.l_type = F_WRLCK;
    fl.l_whence = SEEK_SET;

    if (fcntl(m->fd, F_SETLKW, &fl) == -1) {
        printf("[-] PID %d, lock failed (%s).\n", getpid(), strerror(errno));
    }
}

再来看一个解锁的代码:

void mtx_file_unlock(struct fdmutex *m)
{
    struct flock fl;
    memset(&fl, 0, sizeof(struct flock));

    fl.l_type = F_UNLCK;
    fl.l_whence = SEEK_SET;

    if (fcntl(m->fd, F_SETLK, &fl) == -1) {
        printf("[-] PID %d, unlock failed (%s).\n", getpid(), strerror(errno));
    }
}

遇到临界区,只要在需要进行保护的代码前后加上上面的 lock() 和 unlock() 函数就可以了。

下面来运行一下完整的代码,源代码包含三个文件 Makefile,set, get

  • set : 对共享内存中的 count 执行 10000 次加一。
  • Makefile : 开启多个 set 程序,因此多个 set 会同时对共享内存的 count 操作。
  • get : 等多个 set 运行完成后,运行 get 获取共享内存中的值。

在上一篇博客 Linux 共享内存以及 nginx 中的实现 中我们已经看到,不加锁的情况下,每次的结果都不同;现在我们对 for 循环的前后都加上文件锁,再来看看结果:

先用 make 命令生成可执行文件,

$ make
gcc -o set set.c -lrt -lpthread
gcc -o get get.c -lrt -lpthread

然后执行 make run, 开启 6 个 set 进程同时运行,再 sleep 5 秒钟,这是为了让 set 有充分的时间去执行完毕,最后再用 get 进程去获取共享内存里面的 count 值。

$ make run
[+] PID 21314 start
[+] PID 21316 start
[+] PID 21318 start
[+] PID 21320 start
[+] PID 21322 start
sleep 5
[+] PID 21324 start
./GET pid 21326: Read from shared memory: 60000

多运行几次,可以发现每次的结果都是 60000,证明文件锁对临界区起到了保护作用。

如果看到运行的结果不是 60000,可以修改 Makefile 把 sleep 的时间增大,确保所有的 set 都执行完毕再调用 get。

另一个方法是修改 Makefile 的 run 部分,不自动运行 get 程序,只执行多个 set;然后用 ps aux | grep set 查看系统中有多少个 set 还在运行,确保没有之后再手动执行 ./get

基于原子操作的锁

文件锁的操作效率不及原子锁,原子锁是利用 CPU 提供的原子操作功能来实现锁,比如 Compare-and-Swap 指令 cmpxchgl, 可见效率上更胜一筹。

不过,CPU 只提供了汇编级别的操作指令,我们的 C 语言程序想要用原子操作,要么自己实现,要么调用库函数。在 nginx 的实现中,针对不同的平台、不同的函数库都有相应的处理。

在本文的例子中,为了简单起见,直接使用了 nginx 的 ngx_atomic_cmp_set() 在 x86 架构上的代码,位于 /src/os/unix/ngx_gcc_atomic_x86.h

自旋锁的简单实现

完整的代码请在这里下载

有了自己的 Compare-and-Swap 函数后,就可以实现 lock 和 unlock。另外有一点很重要:因为是对多个进程进行互斥,所以锁的结构必须保存在共享内存区域,这一点不难理解,如果放在进程的私有空间,那么各个进程各玩各的,起不到互斥的作用。

下面是 加锁函数,

void atomic_lock(struct shared_area *m, uint64_t pid)
{
    for (;;) {
        if (m->lock == 0 && atomic_cmp_set(&m->lock, 0, pid)) {
            return;
        }
    }
}

可以看出,上面的加锁代码直接使用了一个 for 循环,当无法获取锁时,无限等待,所以这是一个 自旋锁。为了避免无限等待,nginx 中会用信号量等技巧作一些避免,具体请参考 src/core/ngx_shmtx.c

下面是解锁代码

void atomic_unlock(struct shared_area *m, uint64_t pid)
{
    atomic_cmp_set(&m->lock, pid, 0);
}

最后,我们的测试代码与文件锁的非常类似,只是把加锁解锁函数换成了原子锁实现而已,运行的结果也非常类似:

$ make run
[+] SET PID 21641 start
[+] SET PID 21643 start
[+] SET PID 21645 start
[+] SET PID 21647 start
[+] SET PID 21649 start
sleep 5
[+] SET PID 21651 start
./GET pid 21653: Read from shared memory: 60000

如果结果不是 60000,修改 Makefile 把 sleep 时间增大,确保所有 set 执行完毕再调用 get。

nginx 中的实现

先来总结一下 nginx 中的锁,主要是两大类:

  1. 文件锁:根据传给 fcntl() 的参数,又可以加锁时阻塞和非阻塞
    • F_SETLKW,表示获取不到文件锁时,阻塞直到可以获取
    • F_SETLK, 获取不到锁时会直接返回,不会阻塞进程。因为会直接返回,所以需要在外部在包装一个 ngx_shmtx_trylock() 函数。
  2. 原子锁:用库函数或者nginx实现,取决于 configure 脚本生成的宏定义。
    • 也分为阻塞的和非阻塞的,但是这与锁本身的实现没有关系,而是靠额外的信号量来实现阻塞。

关于源代码的分析,网上有很多资料(比如参考资料1),下面是我的整理,

ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
{
    mtx->lock = &addr->lock;
    // 不支持信号量时,spin 表示锁的自旋次数,为0或负数表示不进行自旋,直接让出cpu,
    // 支持信号量时,-1 表示不要使用信号量将进程置于睡眠状态
    if (mtx->spin == (ngx_uint_t) -1) {
        return NGX_OK;
    }
    // 默认自旋次数
    mtx->spin = 2048;
    
#if (NGX_HAVE_POSIX_SEM)
    mtx->wait = &addr->wait;
    //初始化信号量,第二个参数1表示,信号量使用在多进程环境中,第三个参数0表示信号量的初始值
    //当信号量的值小于等于0时,尝试等待信号量会阻塞
    //当信号量大于0时,尝试等待信号量会成功,并把信号量的值减一。
    if (sem_init(&mtx->sem, 1, 0) == -1) {
    } else {
        mtx->semaphore = 1;
    }
#endif
    return NGX_OK;
}

阻塞式的锁

这是唯一理解起来麻烦一点的地方。总的来说,我们的目标是原子操作的互斥锁,阻塞式的,那么阻塞怎么实现呢?

如果系统不支持信号量,那么获取不到锁的时候,我们让他自旋一会儿,这与我自己的实现是一样的,只不过我用了无限 for 循环,粗暴。而 nginx 的实现则优雅一点,用 mtx->spin 的值指定重试的最大次数。

如果系统支持信号量,那么就不用 spin 自旋了,从ngx_shmtx_create() 函数就能看出来,直接用系统的信号量实现阻塞。

同一时间,二者选其一就行了。 不过信号量的效率没有自旋好,所以一般不使用。例如 ngx_accept_mutex 就直接把 spin 设置为 -1 了。

void
ngx_shmtx_lock(ngx_shmtx_t *mtx)
{
    for ( ;; ) {
        //注意:由于在多进程环境下执行,*mtx->lock == 0 为真时,并不能确保后面的原子操作执行成功
        if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) {
            return;
        }
        // 获取锁失败了,这时候判断cpu的数目,如果数目大于1,则先自旋一段时间,然后再让出cpu
        // 如果cpu数目为1,则没必要进行自旋了,应该直接让出cpu给其他进程执行。
        if (ngx_ncpu > 1) {
            for (n = 1; n < mtx->spin; n <<= 1) {
                for (i = 0; i < n; i++) {
                    // ngx_cpu_pause函数并不是真的将程序暂停,
                       而是为了提升循环等待时的性能,并且可以降低系统功耗。
                    // 实现它时往往是一个指令: `__asm__`("pause")
                    ngx_cpu_pause();
                }
                
                // 再次尝试获取锁
                if (*mtx->lock == 0
                    && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid))
                {
                    return;
                }
            }
        }
        // 支持信号量
#if (NGX_HAVE_POSIX_SEM)
        // 上面自旋次数已经达到,依然没有获取锁,将进程在信号量上挂起,等待其他进程释放锁后再唤醒。
        if (mtx->semaphore) {         
            // 当前在该信号量上等待的进程数目加一
            (void) ngx_atomic_fetch_add(mtx->wait, 1);
            // 尝试获取一次锁,如果获取成功,将等待的进程数目减一,然后返回
            if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) {
                (void) ngx_atomic_fetch_add(mtx->wait, -1);
                return;
            }
            //  在信号量上进行等待
            while (sem_wait(&mtx->sem) == -1) {
                ngx_err_t  err;
                err = ngx_errno;
                if (err != NGX_EINTR) {
                    break;
                }
            }

            // 执行到此,肯定是其他进程释放锁了,
            所以继续回到循环的开始,尝试再次获取锁
            continue;
        }
#endif
        // 在没有获取到锁,且不使用信号量时,会执行到这里.
           通过 sched_yield 函数让调度器暂时将进程切出,让其他进程执行。
        // 在其它进程执行后有可能释放锁,那么下次调度到本进程时,则有可能获取成功。
        ngx_sched_yield();
    }
}

非阻塞式的锁

非阻塞的锁就非常简单了

ngx_shmtx_trylock(ngx_shmtx_t *mtx)
{
    return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
}

相关资料

前一篇        返回首页        后一篇

| Copyright 2009 - 2018 by Runz |