自旋锁spinlock剖析与改进

    xiaoxiao2023-03-24  4

    1, spinlock介绍

      spinlock又称自旋锁,线程通过busy-wait-loop的方式来获取锁,任时刻只有一个线程能够获得锁,其他线程忙等待直到获得锁。spinlock在多处理器多线程环境的场景中有很广泛的使用,一般要求使用spinlock的临界区尽量简短,这样获取的锁可以尽快释放,以满足其他忙等的线程。Spinlock和mutex不同,spinlock不会导致线程的状态切换(用户态->内核态),但是spinlock使用不当(如临界区执行时间过长)会导致cpu busy飙高。

      2, spinlock与mutex对比

      2.1,优缺点比较

      spinlock不会使线程状态发生切换,mutex在获取不到锁的时候会选择sleep。

      mutex获取锁分为两阶段,第一阶段在用户态采用spinlock锁总线的方式获取一次锁,如果成功立即返回;否则进入第二阶段,调用系统的futex锁去sleep,当锁可用后被唤醒,继续竞争锁。

      Spinlock优点:没有昂贵的系统调用,一直处于用户态,执行速度快。

      Spinlock缺点:一直占用cpu,而且在执行过程中还会锁bus总线,锁总线时其他处理器不能使用总线。

      Mutex优点:不会忙等,得不到锁会sleep。

      Mutex缺点:sleep时会陷入到内核态,需要昂贵的系统调用。

      2.2,使用准则

      Spinlock使用准则:临界区尽量简短,控制在100行代码以内,不要有显式或者隐式的系统调用,调用的函数也尽量简短。例如,不要在临界区中调用read,write,open等会产生系统调用的函数,也不要去sleep;strcpy,memcpy等函数慎用,依赖于数据的大小。

      3, spinlock系统实现

      spinlock的实现方式有多种,但是思想都是差不多的,现罗列一下:

      3.1,glibc-2.9中的实现方法:

    int pthread_spin_lock (lock) pthread_spinlock_t *lock ; { asm ( " \n " " 1:\t "  LOCK_PREFIX  " decl %0\n\t " " jne 2f\n\t " " .subsection 1\n\t " " .align 16\n " " 2:\trep; nop\n\t " " cmpl $0, %0\n\t " " jg 1b\n\t " " jmp 2b\n\t " " .previous " " =m "  (*lock) :  " m "  (*lock)) ; return  0 ; }

      执行过程:

      1,lock_prefix 即 lock。lock decl %0,锁总线将%0(即lock变量)减一。Lock可以保证接下来一条指令的原子性。

      2, 如果lock=1,decl的执行结果为lock=0,ZF标志位为1,直接跳到return 0;否则跳到标签2。也许要问,为啥能直接跳到return 0呢?因为subsection和previous之间的代码被编译到别的段中,因此jne之后紧接着的代码就是 return 0 (leaveq;retq)。Rep nop在经过编译器编译之后被编译成 pause。

      3, 如果跳到标签2,说明获取锁不成功,循环等待lock重新变成1,如果lock为1跳到标签1重新竞争锁。

      该实现采用的是AT&T的汇编语法,更详细的执行流程解释可以参考“五竹”大牛的文档。

      3.2,系统自带(glibc-2.3.4)spinlock反汇编代码:

      系统环境:

    2.6.9 - 89 .ELsmp # 1  SMP x86_64 x86_64 x86_64 GNU/Linux (gdb) disas pthread_spin_lock Dump of assembler code for function pthread_spin_lock: //eax寄存器清零,做返回值 0x0000003056a092f0 <pthread_spin_lock+ 0 >: xor %eax , %eax //rdi存的是lock锁地址,原子减一 0x0000003056a092f2 <pthread_spin_lock+ 2 >: lock decl (%rdi) //杯了个催的,加锁不成功,跳转,开始busy wait 0x0000003056a092f5 <pthread_spin_lock+ 5 >: jne 0x3056a09300 <pthread_spin_lock+ 16 > //终于夹上了…加锁成功,返回 0x0000003056a092f7 <pthread_spin_lock+ 7 >: retq ……………………………………….省略若干nop………………………………………. 0x0000003056a092ff <pthread_spin_lock+ 15 >: nop //pause指令降低CPU功耗 0x0000003056a09300 <pthread_spin_lock+ 16 >: pause //检查锁是否可用 0x0000003056a09302 <pthread_spin_lock+ 18 >: cmpl $ 0 × 0 , (%rdi) //回跳,重新锁总线获取锁 0x0000003056a09305 <pthread_spin_lock+ 21 >: jg 0x3056a092f2 <pthread_spin_lock+ 2 > //长夜漫漫,爱上一个不回家的人,继续等~ 0x0000003056a09307 <pthread_spin_lock+ 23 >: jmp 0x3056a09300 <pthread_spin_lock+ 16 > 0x0000003056a09309 <pthread_spin_lock+ 25 >: nop ……………………………………….省略若干nop………………………………………. End of assembler dump. Glibc的汇编代码还是很简洁的,没有多余的代码。

      4, Pause指令解释(from intel):

      Description

      Improves the performance of spin-wait loops. When executing a “spin-wait loop,” a Pentium 4 or Intel Xeon processor suffers a severe performance penalty when exiting the loop because it detects a possible memory order violation. The PAUSE instruction provides a hint to the processor that the code sequence is a spin-wait loop. The processor uses this hint to avoid the memory order violation in most situations, which greatly improves processor performance. For this reason, it is recommended that a PAUSE instruction be placed in all spin-wait loops.

      提升spin-wait-loop的性能,当执行spin-wait循环的时候,笨死和小强处理器会因为在退出循环的时候检测到memory order violation而导致严重的性能损失,pause指令就相当于提示处理器哥目前处于spin-wait中。在绝大多数情况下,处理器根据这个提示来避免violation,藉此大幅提高性能,由于这个原因,我们建议在spin-wait中加上一个pause指令。

      名词解释(以下为本人猜想):memory order violation,直译为-内存访问顺序冲突,当处理器在(out of order)乱序执行的流水线上去内存load某个内存地址的值(此处是lock)的时候,发现这个值正在被store,而且store本身就在load之前,对于处理器来说,这就是一个hazard,流水流不起来。

      在本文中,具体是指当一个获得锁的工作线程W从临界区退出,在调用unlock释放锁的时候,有若干个等待线程S都在自旋检测锁是否可用,此时W线程会产生一个store指令,若干个S线程会产生很多load指令,在store之后的load指令要等待store在流水线上执行完毕才能执行,由于处理器是乱序执行,在没有store指令之前,处理器对多个没有依赖的load是可以随机乱序执行的,当有了store指令之后,需要reorder重新排序执行,此时会严重影响处理器性能,按照intel的说法,会带来25倍的性能损失。Pause指令的作用就是减少并行load的数量,从而减少reorder时所耗时间。

      An additional function of the PAUSE instruction is to reduce the power consumed by a Pentium 4 processor while executing a spin loop. The Pentium 4 processor can execute a spin-wait loop extremely quickly, causing the processor to consume a lot of power while it waits for the resource it is spinning on to become available. Inserting a pause instruction in a spin-wait loop greatly reduces the processor’s power consumption.

      Pause指令还有一个附加作用是减少笨死处理器在执行spin loop时的耗电量。当笨死探测锁是否可用时,笨死以飞快的速度执行spin-wait loop,导致消耗大量电量。在spin-wait loop中插入一条pause指令会显著减少处理器耗电量。

      This instruction was introduced in the Pentium 4 processors, but is backward compat­ible with all IA-32 processors. In earlier IA-32 processors, the PAUSE instruction operates like a NOP instruction. The Pentium 4 and Intel Xeon processors implement the PAUSE instruction as a pre-defined delay. The delay is finite and can be zero for some processors. This instruction does not change the architectural state of the processor (that is, it performs essentially a delaying no-op operation).

      该指令在笨死中被引入,但是向后兼容所有IA-32处理器,在早期的IA-32处理器中,pause指令以nop的方式来运行,笨死和小强以一个预定义delay的方式来实现pause,该延迟是有限的,在某些处理器上可能是0,pause指令并不改变处理器的架构(位?)状态(也就是说,它是一个延迟执行的nop指令)。至于Pause指令的延迟有多大,intel并没有给出具体数值,但是据某篇文章给出的测试结果,大概有38~40个clock左右(官方数字:nop延迟是1个clock),一下子延迟40个clock,intel也够狠的。

      This instruction’s operation is the same in non-64-bit modes and 64-bit mode.

      该指令在64位与非64位模式下的表现是一致的。

      5, spinlock改进

      根据上一小节的分析,pause指令最主要的作用是减低功耗和延迟执行下一条指令。所以我们可以有这样的猜想:如果在spin-wait的过程中,记录下加锁失败的次数,对失败的加锁行为进行惩罚(failure penalty),让等待时间和失败次数成正比,即失败次数越多等待时间越长、执行的pause指令越多。

    这样的好处有:

      A,在锁竞争很激烈的情况下,通过增加延迟来降低锁总线的次数,当锁总线次数降低时,系统其它程序对内存的竞争减少,提高系统整体吞吐量。

      B,在锁竞争很激烈的情况下,减少计算指令的执行次数,降低功耗,低碳低碳!!!

      C,当锁竞争不激烈时,还能获得和原来一样的性能。

    但是带来的问题有:

      A,如果竞争锁失败次数越多,pause次数越多的话,会导致有些线程产生starvation。

      B,在某些特殊的场景下,锁竞争很小的时候,failure penalty可能会导致程序执行时间变长,进而导致总的加锁次数不一定减少。

    解决方案:

      当失败次数超过一个阈值的时候,将失败次数清零,使线程以洁白之躯重新参与竞争;对于少数failure penalty导致执行时间延长的情况,可以先忽略。

    基于failure penalty的改进实现:

      为了便于区分,起了很山寨的名字叫newlock,对比对象是sim_spin_lock,sim_spin_lock与pthread_spin_lock算法一致,实现细节基本一致,之所以加入这种对比是为了更加精确地衡量新算法的效果。ecx寄存器记录下本次加锁过程的失败次数。

    int newlock(pthread_spinlock_t *lock){ __asm__( //eax清零,记录当前lock次数 " xor %%eax,%%eax\n\t " //ecx清零,记录总的lock次数 " xor %%ecx,%%ecx\n\t " //记录加锁次数 " 1:incl %%ecx\n\t " //记录当前加锁次数 " incl %%eax\n\t " //锁总线,开始加锁,rdi寄存器存储的是lock变量的地址 " lock decl(%%rdi)\n\t " //加锁不成功 " jne 2f\n\t " //加锁成功,将锁总线次数作为返回值返回 " movl %%ecx,%%eax\n\t " " leave\n\t " " retq\n\t " " nop\n\t " " nop\n\t " //pause跳转标签 " 5:pause\n\t " " 4:pause\n\t " " 3:pause\n\t " " 2:pause\n\t " //探测锁是否可用 " cmpl $0x0,(%%rdi)\n\t " //锁可用,重新加锁 " jg 1b\n\t " //加锁次数与4取模 " and $0x3,%%eax\n\t " //根据结果进行跳转 " cmpl $0x0,%%eax\n\t " " je 2b\n\t " " cmpl $0x1,%%eax\n\t " " je 3b\n\t " " cmpl $0x2,%%eax\n\t " " je 4b\n\t " " je 5b\n\t " " nop\n\t " " nop\n\t " : : " D " (lock) : " %eax " , " %ecx " , " %edx " ) ; } 与pthread_spin_lock算法相同的sim_spin_lock: int sim_spin_lock(pthread_spinlock_t *lock){ __asm__( //eax清零,记录当前lock次数 " xor %%eax,%%eax\n\t " //ecx清零,记录总的lock次数 " xor %%ecx,%%ecx\n\t " //记录加锁次数 " 1:incl %%ecx\n\t " //记录当前加锁次数 " incl %%eax\n\t " //锁总线,开始加锁,rdi寄存器存储的是lock变量的地址 " lock decl(%%rdi)\n\t " //加锁不成功 " jne 2f\n\t " //加锁成功,将锁总线次数作为返回值返回 " movl %%ecx,%%eax\n\t " " leave\n\t " " retq\n\t " " nop\n\t " " nop\n\t " //pause跳转标签 " 2:pause\n\t " //探测锁是否可用 " cmpl $0x0,(%%rdi)\n\t " //锁可用,重新加锁 " jg 1b\n\t " " jmp 2b\n\t " " nop\n\t " " nop\n\t " : : " D " (lock) : " %eax " , " %ecx " , " %edx " ) ; }

      6, 性能对比

    测试环境: 

      处理器:8Core Intel(R) Xeon(R) CPU E5410  @ 2.33GHz

      Glibc版本:glibc-2.3.4-2.43

      GCC版本:3.4.6

    测试程序:  #include  < stdio.h > #include  < pthread.h > #include  < stdlib.h > #include  < sys / time.h > #include  " liblock.h " #define  MAX_LOOP 1000000 #define  CODE_LEN 10 pthread_spinlock_t mylock; long  g_count  = 0 ; int  thread_count  = 20 ; int  _nlock  = 0 ; int  lock_count  = 0 ; int  loop_count  = 1000000 ; int  code_len  = 1 ; void * func( void * arg){ int  j, k, m; for ( int  i = 0 ; i <  loop_count; i ++ ){ if (_nlock  == 0 ){ lock_count  +=  sim_spin_lock( & mylock); // pthread_spin_lock(&mylock); } else { lock_count  +=  newlock( & mylock); // newlock(&mylock); } g_count ++ ; // 下面这几句代码之间有很强的依赖性,流水hazard比较多 // 每句代码都要依赖上一句的执行结果,而且都是store操作 // 用于模仿实际情况中的临界区代码 for ( int  i = 0 ; i <  code_len; i ++ ){ j  =  k; m  =  j; k  =  m; m  =  j + 1 ; k  =  m + 2 ; j  =  m + k; } pthread_spin_unlock( & mylock); } return  NULL; } int  get_process_time( struct  timeval  * ptvStart) { struct  timeval tvEnd; gettimeofday( & tvEnd,NULL); return  ((tvEnd.tv_sec  -  ptvStart -> tv_sec) * 1000 + (tvEnd.tv_usec  -  ptvStart -> tv_usec) / 1000 ); } int  main( int  argc,  char * argv[]){ if (argc  < 3 ){ return 0 ; } int  ms_time  = 0 ; struct  timeval tvStart; _nlock  =  atoi(argv[ 1 ]); thread_count  =  atoi(argv[ 2 ]); loop_count  =  atoi(argv[ 3 ]); code_len  =  atoi(argv[ 4 ]); pthread_t  * tid  =  (pthread_t * )malloc( sizeof (pthread_t) * thread_count); pthread_spin_init( & mylock,  0 ); gettimeofday( & tvStart,NULL); for ( int  i = 0 ; i < thread_count; i ++ ){ pthread_create( & tid[i], NULL, func, NULL); } void * tret; for ( int  i = 0 ; i < thread_count; i ++ ){ int  ret  =  pthread_join(tid[i],  & tret); if (ret  != 0 ){ printf( " cannot join thread1 " ); } } ms_time  =  get_process_time( & tvStart); fprintf(stderr,  " g_count:%ld\tlock_count:%ld\tloop_count:%d\tcode_len:%d\ttime:%.2f\n " , \ g_count, lock_count, loop_count, code_len, ms_time / 1000.0f ); return 0 ; } Benchmark设置:

      >输入参数:

      线程数量:1,2,3,5,7,8,10,12,14,16,18,20,25,30,35,40

      每个线程加锁(循环)次数:5000000

      临界区长度:1,1+1*6,1+3*6,1+5*6,1+10*6,1+20*6,1+50*6,1+100*6

      case执行次数:每个case执行10次,尽量减少误差

      > 输出参数:

      执行时间:平均执行时间

      锁总线次数:平均锁总线次数

    执行时间对比:

    锁总线次数对比: 

      还有若干图,囿于篇幅和时间,亲们我就不贴了

      7, 实验结果分析

    定义: 

      竞争因子:竞争因子=线程数量/处理器个数。以本文为例,假设有16个线程,处理器为8核,那么竞争因子为2

    竞争因子与执行时间的关系: 

      当竞争因子为1/8,也就是只有一个线程的时候,很明显两种算法的执行时间是一致的,因为此时没有其他线程竞争锁;当竞争因子在1/8~4/8之间的时候,在某些情况下(依赖于临界区长度)新算法性能低于老算法;当竞争因子大于4/8的时候,新算法性能优于老算法。

    竞争因子与锁总线次数的关系: 

      当竞争因子为1/8的时候,新老算法一致(原理同上);当竞争因子在1/8~4/8之间的时候,大部分情况下老算法性能优于新算法;当竞争因子大于4/8的时候,新算法性能优于老算法,由于锁总线次数基数比较大,在图上可能比较难看出来。

    拐点: 

      细心的同学可能会发现有一个拐点:就是当临界区的长度为 1+50*6 的时候,也就是临界区有300行代码的时候,新算法锁总线次数要比老算法多,而且执行时间也长一些,这就牵扯到新算法的一个缺点:在某些情况下,当锁可用,需要去竞争锁的时候,由于线程还在pause中,只有等pause结束才能去竞争,而pause结束时,锁很可能不可用(被其他线程获取),根据新算法,加锁失败线程又要多pause一次,导致整体的锁总线次数和执行时间增加。这个拐点依赖于竞争因子、具体的处理器、具体的临界区代码、pause执行周期。

    不足: 

      由于时间的关系,试验做的不是很足;指令回避那块可以做优化(目前只是简单的cmpl和jmp);到底最大pause次数是多少需要试验来支撑,目前是4次;个人感觉pause的处理器实现粒度还是比较粗的,应该是intel的一个经验值,接下来的试验可以用nop来代替pause,这样得出来的数据应该会更为平滑一些,控制也更为细腻。

    总结: 

      当竞争比较激烈的时候,新算法在绝大部分情况下优于老算法(除了拐点),大概有2-5%的提升;当处理器竞争比较小的时候,尤其是竞争因子为3/8的时候,新算法不如老算法;如果用nop替代pause应该会有更好的表现。

    转载请注明原文地址: https://ju.6miu.com/read-1202078.html
    最新回复(0)