Contents

Java中通过Key获取锁

1. 概述

在本文中,我们将了解如何在特定键上获得锁,以防止对该键的并发操作,而不会妨碍对其他键的操作。 一般来说,我们需要实现两种方法并了解如何操作它们:

  • void lock(String key)
  • void unlock(String key)

为了本教程的简单性,我们总是假设我们的键是Strings。您可以在正确定义equalshashCode方法的唯一条件下将它们替换为您需要的对象类型,因为我们将它们用作HashMap 键。

2. 一个简单的互斥锁

首先,假设我们想要阻止任何请求的操作,如果相应的键已经在使用中。在这里,我们宁愿定义一个boolean tryLock(String key)方法,而不是我们想象的lock方法。

具体来说,我们的目标是维护一密钥,我们将随时用正在使用的密钥填充这些密钥。因此,当对某个键请求新操作时,如果我们发现该键已被另一个线程使用,我们将不得不拒绝它。

我们在这里面临的问题是Set没有线程安全的实现。因此,我们将使用由ConcurrentHashMap 支持的Set。使用ConcurrentHashMap可以保证我们在多线程环境中的数据一致性。

让我们看看它的实际效果:

public class SimpleExclusiveLockByKey {
    private static Set<String> usedKeys= ConcurrentHashMap.newKeySet();
    
    public boolean tryLock(String key) {
        return usedKeys.add(key);
    }
    
    public void unlock(String key) {
        usedKeys.remove(key);
    }
}

下面是我们如何使用这个类:

String key = "key";
SimpleExclusiveLockByKey lockByKey = new SimpleExclusiveLockByKey();
try {
    lockByKey.tryLock(key);
    // insert the code that needs to be executed only if the key lock is available
} finally { // CRUCIAL
    lockByKey.unlock(key);
}

**让我们坚持finally 块的存在:在其中调用unlock方法至关重要。**这样,即使我们的代码在try括号内抛出exception,我们也会解锁密钥。

3. 按键获取和释放锁

现在,让我们进一步深入研究这个问题,并说我们不想简单地拒绝对相同键的同时操作,但我们宁愿让新的传入操作等到键上的当前操作完成。

申请流程将是:

  • 第一个线程请求一个键上的锁:它获取键上的锁
  • 第二个线程请求锁定同一个键:线程 2 被告知等待
  • 第一个线程释放键上的锁
  • 第二个线程获得键上的锁并可以执行其操作

3.1. 使用线程计数器定义锁

在这种情况下,使用Lock 听起来很自然。简而言之,Lock是一个用于线程同步的对象,它允许阻塞线程直到它被获取。Lock是一个接口——我们将使用ReentrantLock,它的基本实现。

让我们首先将Lock包装在一个内部类中。此类将能够跟踪当前等待锁定密钥的线程数。它将公开两种方法,一种用于增加线程计数器,另一种用于减少它:

private static class LockWrapper {
    private final Lock lock = new ReentrantLock();
    private final AtomicInteger numberOfThreadsInQueue = new AtomicInteger(1);
    private LockWrapper addThreadInQueue() {
        numberOfThreadsInQueue.incrementAndGet(); 
        return this;
    }
    private int removeThreadFromQueue() {
        return numberOfThreadsInQueue.decrementAndGet(); 
    }
}

3.2. 让锁处理排队线程

此外,我们将继续使用ConcurrentHashMap。但是不像我们之前那样简单地提取Map的键,我们将使用LockWrapper对象作为值:

private static ConcurrentHashMap<String, LockWrapper> locks = new ConcurrentHashMap<String, LockWrapper>();

当一个线程想要获取一个键上的锁时,我们需要查看该键是否已经存在LockWrapper

  • 如果没有,我们将为给定键实例化一个新的 LockWrapper,并将计数器设置为 1
  • 如果是这样,我们将返回现有的LockWrapper并增加其关联的计数器

让我们看看这是如何完成的:

public void lock(String key) {
    LockWrapper lockWrapper = locks.compute(key, (k, v) -> v == null ? new LockWrapper() : v.addThreadInQueue());
    lockWrapper.lock.lock();
}

由于使用了HashMap计算方法,代码非常简洁。让我们详细介绍一下此方法的功能:

  • 计算方法应用于对象locks,以key作为其第一个参数:检索与key in locks对应的初始值
  • compute 的第二个参数给出的BiFunction 应用于key和初始值:结果给出一个新值
  • 新值替换lockkey的初始值

3.3. 解锁并可选择删除地图条目

此外,当线程释放锁时,我们将减少与LockWrapper关联的线程数。如果计数降至零,那么我们将从ConcurrentHashMap中删除键:

public void unlock(String key) {
    LockWrapper lockWrapper = locks.get(key);
    lockWrapper.lock.unlock();
    if (lockWrapper.removeThreadFromQueue() == 0) { 
        // NB : We pass in the specific value to remove to handle the case where another thread would queue right before the removal
        locks.remove(key, lockWrapper);
    }
}

3.4. 概括

简而言之,让我们看看我们整个班级最终的样子:

public class LockByKey {
    
    private static class LockWrapper {
        private final Lock lock = new ReentrantLock();
        private final AtomicInteger numberOfThreadsInQueue = new AtomicInteger(1);

        private LockWrapper addThreadInQueue() {
            numberOfThreadsInQueue.incrementAndGet(); 
            return this;
        }

        private int removeThreadFromQueue() {
            return numberOfThreadsInQueue.decrementAndGet(); 
        }

    }

    private static ConcurrentHashMap<String, LockWrapper> locks = new ConcurrentHashMap<String, LockWrapper>();
    
    public void lock(String key) {
        LockWrapper lockWrapper = locks.compute(key, (k, v) -> v == null ? new LockWrapper() : v.addThreadInQueue());
        lockWrapper.lock.lock();
    }

    public void unlock(String key) {
        LockWrapper lockWrapper = locks.get(key);
        lockWrapper.lock.unlock();
        if (lockWrapper.removeThreadFromQueue() == 0) { 
            // NB : We pass in the specific value to remove to handle the case where another thread would queue right before the removal
            locks.remove(key, lockWrapper);
        }
    }

}

用法和我们之前的很相似:

String key = "key"; 
LockByKey lockByKey = new LockByKey(); 
try { 
    lockByKey.lock(key);
    // insert your code here 
} finally { // CRUCIAL 
    lockByKey.unlock(key); 
}

4.同时允许多个动作

最后但同样重要的是,让我们考虑另一种情况:我们希望将允许同时作用于同一个键的线程数限制为某个整数n,而不是一次只允许一个线程对给定键执行操作。为简单起见,我们将设置n = 2。

让我们广泛描述我们的用例:

  • 第一个线程想要获取键上的锁:它将被允许这样做
  • 第二个线程想要获取相同的锁:它也将被允许
  • 第三个线程请求对同一个键的锁:它必须排队,直到前两个线程之一释放它的锁

信号量 就是为此而生的。semaphore是用于限制同时访问资源的线程数的对象。

全局功能和代码看起来与我们的锁非常相似:

public class SimultaneousEntriesLockByKey {
    private static final int ALLOWED_THREADS = 2;
    
    private static ConcurrentHashMap<String, Semaphore> semaphores = new ConcurrentHashMap<String, Semaphore>();
    
    public void lock(String key) {
        Semaphore semaphore = semaphores.compute(key, (k, v) -> v == null ? new Semaphore(ALLOWED_THREADS) : v);
        semaphore.acquireUninterruptibly();
    }
    
    public void unlock(String key) {
        Semaphore semaphore = semaphores.get(key);
        semaphore.release();
        if (semaphore.availablePermits() == ALLOWED_THREADS) { 
            semaphores.remove(key, semaphore);
        }  
    }
    
}

用法相同:

String key = "key"; 
SimultaneousEntriesLockByKey lockByKey = new SimultaneousEntriesLockByKey(); 
try { 
    lockByKey.lock(key); 
    // insert your code here 
} finally { // CRUCIAL 
    lockByKey.unlock(key); 
}