Java实现基于Redis的分布式锁

    xiaoxiao2023-03-24  2

    Java实现基于Redis的分布式锁

     单JVM内同步好办, 直接用JDK提供的锁就可以了,但是跨进程同步靠这个肯定是不可能的,这种情况下肯定要借助第三方,我这里实现用Redis,当然还有很多其他的实现方式。其实基于Redis实现的原理还算比较简单的,在看代码之前建议大家先去这里看看原理,我就不翻译了,免得变味了,看懂了之后看代码应该就容易理解了。

     

    时间统一问题:各个客户端加锁时需要获取时间,而这个时间都不应当从本地获取,因为各个客户端的时间并不是一致的,因此需要提供一个TimeServer提供获取时间的服务,下面源码中用到的关于时间服务的三个类(包括TimeServer、TimeClient和Time Client Exception)会在另一篇博客《Java NIO时间服务》给源码.

     

    我这里不实现JDK的java.util.concurrent.locks.Lock接口,而是自定义一个,因为JDK的有个newCondition()方法我这里暂时没实现。这个Lock提供了5个lock方法的变体,可以自行选择使用哪一个来获取锁,我的想法是

    最好用带超时返回的那几个方法,因为不这样的话,假如redis挂了,线程永远都在那死循环了(关于这里,应该还可以进一步优化,如果redis挂了,Jedis的操作肯定会抛异常之类的,可以定义个机制让redis挂了的时候通知使用这个lock的用户,或者说是线程)。

     

     

    Java代码  package cc.lixiaohui.lock;      import java.util.concurrent.TimeUnit;      public interface Lock extends Releasable{          /**       * 阻塞性的获取锁, 不响应中断       */       void lock();              /**       * 阻塞性的获取锁, 响应中断       *        * @throws InterruptedException       */       void lockInterruptibly() throws InterruptedException;              /**       * 尝试获取锁, 获取不到立即返回, 不阻塞       */       boolean tryLock();              /**       * 超时自动返回的阻塞性的获取锁, 不响应中断       *        * @param time       * @param unit       * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未获取到锁       *                */       boolean tryLock(long time, TimeUnit unit);              /**       * 超时自动返回的阻塞性的获取锁, 响应中断       *        * @param time       * @param unit       * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未获取到锁       * @throws InterruptedException 在尝试获取锁的当前线程被中断       */       boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException;              /**       * 释放锁       */       void unlock();          }  

     

    Releasable.java :

    Java代码  package cc.lixiaohui.lock;      /**   * 代表持有资源的对象, 例如   * <ul>   * <li> 基于jedis的锁自然持有与redis server的连接 </li>   * <li> 基于时间统一的的锁自然持有与time server的连接</li>   * </ul>   * 因此锁应该实现该接口, 并在{@link Releasable#resease() release} 方法中释放相关的连接   *    * @author lixiaohui   *   */   public interface Releasable {              /**       * 释放持有的所有资源       */       void release();          }  

     

    看Lock的抽象实现:

     

    Java代码  package cc.lixiaohui.lock;      import java.util.concurrent.TimeUnit;      /**   * 锁的骨架实现, 真正的获取锁的步骤由子类去实现.   *    * @author lixiaohui   *   */   public abstract class AbstractLock implements Lock {          /**       * <pre>       * 这里需不需要保证可见性值得讨论, 因为是分布式的锁,        * 1.同一个jvm的多个线程使用不同的锁对象其实也是可以的, 这种情况下不需要保证可见性        * 2.同一个jvm的多个线程使用同一个锁对象, 那可见性就必须要保证了.       * </pre>       */       protected volatile boolean locked;          /**       * 当前jvm内持有该锁的线程(if have one)       */       private Thread exclusiveOwnerThread;          public void lock() {           try {               lock(false0nullfalse);           } catch (InterruptedException e) {               // TODO ignore           }       }          public void lockInterruptibly() throws InterruptedException {           lock(false0nulltrue);       }          public boolean tryLock(long time, TimeUnit unit) {           try {               return lock(true, time, unit, false);           } catch (InterruptedException e) {               // TODO ignore           }           return false;       }          public boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException {           return lock(true, time, unit, true);       }          public void unlock() {           // TODO 检查当前线程是否持有锁           if (Thread.currentThread() != getExclusiveOwnerThread()) {               throw new IllegalMonitorStateException("current thread does not hold the lock");           }                      unlock0();           setExclusiveOwnerThread(null);       }          protected void setExclusiveOwnerThread(Thread thread) {           exclusiveOwnerThread = thread;       }          protected final Thread getExclusiveOwnerThread() {           return exclusiveOwnerThread;       }          protected abstract void unlock0();              /**       * 阻塞式获取锁的实现       *        * @param useTimeout        * @param time       * @param unit       * @param interrupt 是否响应中断       * @return       * @throws InterruptedException       */       protected abstract boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException;      }  

     

     基于Redis的最终实现(not reentrant),关键的获取锁,释放锁的代码在这个类的lock方法和unlock0方法里,大家可以只看这两个方法然后完全自己写一个:

     

    Java代码  package cc.lixiaohui.lock;      import java.io.IOException;   import java.net.SocketAddress;   import java.util.concurrent.TimeUnit;      import redis.clients.jedis.Jedis;   import cc.lixiaohui.lock.time.nio.client.TimeClient;      /**   * <pre>   * 基于Redis的SETNX操作实现的分布式锁   *    * 获取锁时最好用lock(long time, TimeUnit unit), 以免网路问题而导致线程一直阻塞   *    * <a href="http://redis.io/commands/setnx">SETNC操作参考资料</a>   * </pre>   *    * @author lixiaohui   *   */   public class RedisBasedDistributedLock extends AbstractLock {              private Jedis jedis;              private TimeClient timeClient;              // 锁的名字       protected String lockKey;              // 锁的有效时长(毫秒)       protected long lockExpires;              public RedisBasedDistributedLock(Jedis jedis, String lockKey, long lockExpires, SocketAddress timeServerAddr) throws IOException {           this.jedis = jedis;           this.lockKey = lockKey;           this.lockExpires = lockExpires;           timeClient = new TimeClient(timeServerAddr);       }              // 阻塞式获取锁的实现       protected boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException{           if (interrupt) {               checkInterruption();           }                      // 超时控制 的时间可以从本地获取, 因为这个和锁超时没有关系, 只是一段时间区间的控制           long start = localTimeMillis();           long timeout = unit.toMillis(time); // if !useTimeout, then it's useless                      while (useTimeout ? isTimeout(start, timeout) : true) {               if (interrupt) {                   checkInterruption();               }                              long lockExpireTime = serverTimeMillis() + lockExpires + 1;//锁超时时间               String stringOfLockExpireTime = String.valueOf(lockExpireTime);                              if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 获取到锁                   // TODO 成功获取到锁, 设置相关标识                   locked = true;                   setExclusiveOwnerThread(Thread.currentThread());                   return true;               }                              String value = jedis.get(lockKey);               if (value != null && isTimeExpired(value)) { // lock is expired                   // 假设多个线程(非单jvm)同时走到这里                   String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic                   // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的)                   // 加入拿到的oldValue依然是expired的,那么就说明拿到锁了                   if (oldValue != null && isTimeExpired(oldValue)) {                       // TODO 成功获取到锁, 设置相关标识                       locked = true;                       setExclusiveOwnerThread(Thread.currentThread());                       return true;                   }               } else {                    // TODO lock is not expired, enter next loop retrying               }           }           return false;       }              public boolean tryLock() {           long lockExpireTime = serverTimeMillis() + lockExpires + 1;//锁超时时间           String stringOfLockExpireTime = String.valueOf(lockExpireTime);                      if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 获取到锁               // TODO 成功获取到锁, 设置相关标识               locked = true;               setExclusiveOwnerThread(Thread.currentThread());               return true;           }                      String value = jedis.get(lockKey);           if (value != null && isTimeExpired(value)) { // lock is expired               // 假设多个线程(非单jvm)同时走到这里               String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic               // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的)               // 假如拿到的oldValue依然是expired的,那么就说明拿到锁了               if (oldValue != null && isTimeExpired(oldValue)) {                   // TODO 成功获取到锁, 设置相关标识                   locked = true;                   setExclusiveOwnerThread(Thread.currentThread());                   return true;               }           } else {                // TODO lock is not expired, enter next loop retrying           }                      return false;       }              /**       * Queries if this lock is held by any thread.       *        * @return {@code true} if any thread holds this lock and       *         {@code false} otherwise       */       public boolean isLocked() {           if (locked) {               return true;           } else {               String value = jedis.get(lockKey);               // TODO 这里其实是有问题的, 想:当get方法返回value后, 假设这个value已经是过期的了,               // 而就在这瞬间, 另一个节点set了value, 这时锁是被别的线程(节点持有), 而接下来的判断               // 是检测不出这种情况的.不过这个问题应该不会导致其它的问题出现, 因为这个方法的目的本来就               // 不是同步控制, 它只是一种锁状态的报告.               return !isTimeExpired(value);           }       }          @Override       protected void unlock0() {           // TODO 判断锁是否过期           String value = jedis.get(lockKey);           if (!isTimeExpired(value)) {               doUnlock();           }       }              public void release() {           jedis.close();           timeClient.close();       }              private void doUnlock() {           jedis.del(lockKey);       }          private void checkInterruption() throws InterruptedException {           if(Thread.currentThread().isInterrupted()) {               throw new InterruptedException();           }       }              private boolean isTimeExpired(String value) {           // 这里拿服务器的时间来比较           return Long.parseLong(value) < serverTimeMillis();       }              private boolean isTimeout(long start, long timeout) {           // 这里拿本地的时间来比较           return start + timeout > System.currentTimeMillis();       }              private long serverTimeMillis(){           return timeClient.currentTimeMillis();       }              private long localTimeMillis() {           return System.currentTimeMillis();       }          }  

     

    如果将来还换一种实现方式(比如zookeeper之类的),到时直接继承AbstractLock并实现lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt), unlock0()方法即可(所谓抽象嘛)

     

    测试

     

    模拟全局ID增长器,设计一个IDGenerator类,该类负责生成全局递增ID,其代码如下:

     

    Java代码  package cc.lixiaohui.lock.example;      import java.math.BigInteger;   import java.util.concurrent.TimeUnit;      import cc.lixiaohui.lock.Lock;   import cc.lixiaohui.lock.Releasable;      /**   * 模拟分布式环境中的ID生成    * @author lixiaohui   *   */   public class IDGenerator implements Releasable{          private static BigInteger id = BigInteger.valueOf(0);          private final Lock lock;          private static final BigInteger INCREMENT = BigInteger.valueOf(1);          public IDGenerator(Lock lock) {           this.lock = lock;       }              public String getAndIncrement() {           if (lock.tryLock(3, TimeUnit.SECONDS)) {               try {                   // TODO 这里获取到锁, 访问临界区资源                   System.out.println(Thread.currentThread().getName() + " get lock");                   return getAndIncrement0();               } finally {                   lock.unlock();               }           }           return null;           //return getAndIncrement0();       }              public void release() {           lock.release();       }          private String getAndIncrement0() {           String s = id.toString();           id = id.add(INCREMENT);           return s;       }   }  

     

    测试主逻辑:同一个JVM内开两个线程死循环地(循环之间无间隔,有的话测试就没意义了)获取ID(我这里并不是死循环而是跑20s),获取到ID存到同一个Set里面,在存之前先检查该ID在set中是否存在,如果已存在,则让两个线程都停止。如果程序能正常跑完20s,那么说明这个分布式锁还算可以满足要求,如此测试的效果应该和不同JVM(也就是真正的分布式环境中)测试的效果是一样的,下面是测试类的代码:

     

    Java代码  package cc.lixiaohui.DistributedLock.DistributedLock;      import java.net.InetSocketAddress;   import java.net.SocketAddress;   import java.util.HashSet;   import java.util.Set;      import org.junit.Test;      import redis.clients.jedis.Jedis;   import cc.lixiaohui.lock.Lock;   import cc.lixiaohui.lock.RedisBasedDistributedLockV0_0;   import cc.lixiaohui.lock.RedisBasedDistributedLock;   import cc.lixiaohui.lock.example.IDGenerator;      public class IDGeneratorTest {              private static Set<String> generatedIds = new HashSet<String>();              private static final String LOCK_KEY = "lock.lock";       private static final long LOCK_EXPIRE = 5 * 1000;              @Test       public void testV1_0() throws Exception {                      SocketAddress addr = new InetSocketAddress("localhost"9999);                      Jedis jedis1 = new Jedis("localhost"6379);           Lock lock1 = new RedisBasedDistributedLock(jedis1, LOCK_KEY, LOCK_EXPIRE, addr);           IDGenerator g1 = new IDGenerator(lock1);           IDConsumeTask consume1 = new IDConsumeTask(g1, "consume1");                      Jedis jedis2 = new Jedis("localhost"6379);           Lock lock2 = new RedisBasedDistributedLock(jedis2, LOCK_KEY, LOCK_EXPIRE, addr);           IDGenerator g2 = new IDGenerator(lock2);           IDConsumeTask consume2 = new IDConsumeTask(g2, "consume2");                      Thread t1 = new Thread(consume1);           Thread t2 = new Thread(consume2);           t1.start();           t2.start();                      Thread.sleep(20 * 1000); //让两个线程跑20秒                      IDConsumeTask.stopAll();                      t1.join();           t2.join();       }              static String time() {           return String.valueOf(System.currentTimeMillis() / 1000);       }              static class IDConsumeTask implements Runnable {              private IDGenerator idGenerator;                      private String name;                      private static volatile boolean stop;                      public IDConsumeTask(IDGenerator idGenerator, String name) {               this.idGenerator = idGenerator;               this.name = name;           }                      public static void stopAll() {               stop = true;           }                      public void run() {               System.out.println(time() + ": consume " + name + " start ");               while (!stop) {                   String id = idGenerator.getAndIncrement();                   if (id != null) {                       if(generatedIds.contains(id)) {                           System.out.println(time() + ": duplicate id generated, id = " + id);                           stop = true;                           continue;                       }                                               generatedIds.add(id);                       System.out.println(time() + ": consume " + name + " add id = " + id);                   }               }               // 释放资源               idGenerator.release();               System.out.println(time() + ": consume " + name + " done ");           }                  }          }  

     

    说明一点,我这里停止两个线程的方式并不是很好,我是为了方便才这么做的,因为只是测试,最好不要这么做。

     

    测试结果

    跑20s打印的东西太多,前面打印的被clear了,只有差不多跑完的时候才有,下面截图。说明了这个锁能正常工作:

       

    当IDGererator没有加锁(即IDGererator的getAndIncrement方法内部获取id时不上锁)时,测试是不通过的,非常大的概率中途就会停止,下面是不加锁时的测试结果:

     

    这个1秒都不到:   这个也1秒都不到:

       

    转载请注明原文地址: https://ju.6miu.com/read-1201092.html
    最新回复(0)