Java中通过Key获取锁
1. 概述
在本文中,我们将了解如何在特定键上获得锁,以防止对该键的并发操作,而不会妨碍对其他键的操作。 一般来说,我们需要实现两种方法并了解如何操作它们:
- void lock(String key)
- void unlock(String key)
为了本教程的简单性,我们总是假设我们的键是Strings。您可以在正确定义equals和hashCode方法的唯一条件下将它们替换为您需要的对象类型,因为我们将它们用作HashMap 键。
2. 一个简单的互斥锁
首先,假设我们想要阻止任何请求的操作,如果相应的键已经在使用中。在这里,我们宁愿定义一个boolean tryLock(String key)方法,而不是我们想象的lock方法。
具体来说,我们的目标是维护一组密钥,我们将随时用正在使用的密钥填充这些密钥。因此,当对某个键请求新操作时,如果我们发现该键已被另一个线程使用,我们将不得不拒绝它。
我们在这里面临的问题是Set没有线程安全的实现。因此,我们将使用由ConcurrentHashMap 支持的Set。使用ConcurrentHashMap可以保证我们在多线程环境中的数据一致性。
让我们看看它的实际效果:
public class SimpleExclusiveLockByKey {
private static Set<String> usedKeys= ConcurrentHashMap.newKeySet();
public boolean tryLock(String key) {
return usedKeys.add(key);
}
public void unlock(String key) {
usedKeys.remove(key);
}
}
下面是我们如何使用这个类:
String key = "key";
SimpleExclusiveLockByKey lockByKey = new SimpleExclusiveLockByKey();
try {
lockByKey.tryLock(key);
// insert the code that needs to be executed only if the key lock is available
} finally { // CRUCIAL
lockByKey.unlock(key);
}
**让我们坚持finally 块的存在:在其中调用unlock方法至关重要。**这样,即使我们的代码在try括号内抛出exception,我们也会解锁密钥。
3. 按键获取和释放锁
现在,让我们进一步深入研究这个问题,并说我们不想简单地拒绝对相同键的同时操作,但我们宁愿让新的传入操作等到键上的当前操作完成。
申请流程将是:
- 第一个线程请求一个键上的锁:它获取键上的锁
- 第二个线程请求锁定同一个键:线程 2 被告知等待
- 第一个线程释放键上的锁
- 第二个线程获得键上的锁并可以执行其操作
3.1. 使用线程计数器定义锁
在这种情况下,使用Lock 听起来很自然。简而言之,Lock是一个用于线程同步的对象,它允许阻塞线程直到它被获取。Lock是一个接口——我们将使用ReentrantLock,它的基本实现。
让我们首先将Lock包装在一个内部类中。此类将能够跟踪当前等待锁定密钥的线程数。它将公开两种方法,一种用于增加线程计数器,另一种用于减少它:
private static class LockWrapper {
private final Lock lock = new ReentrantLock();
private final AtomicInteger numberOfThreadsInQueue = new AtomicInteger(1);
private LockWrapper addThreadInQueue() {
numberOfThreadsInQueue.incrementAndGet();
return this;
}
private int removeThreadFromQueue() {
return numberOfThreadsInQueue.decrementAndGet();
}
}
3.2. 让锁处理排队线程
此外,我们将继续使用ConcurrentHashMap。但是不像我们之前那样简单地提取Map的键,我们将使用LockWrapper对象作为值:
private static ConcurrentHashMap<String, LockWrapper> locks = new ConcurrentHashMap<String, LockWrapper>();
当一个线程想要获取一个键上的锁时,我们需要查看该键是否已经存在LockWrapper:
- 如果没有,我们将为给定键实例化一个新的 LockWrapper,并将计数器设置为 1
- 如果是这样,我们将返回现有的LockWrapper并增加其关联的计数器
让我们看看这是如何完成的:
public void lock(String key) {
LockWrapper lockWrapper = locks.compute(key, (k, v) -> v == null ? new LockWrapper() : v.addThreadInQueue());
lockWrapper.lock.lock();
}
由于使用了HashMap的计算方法,代码非常简洁。让我们详细介绍一下此方法的功能:
- 计算方法应用于对象locks,以key作为其第一个参数:检索与key in locks对应的初始值
- compute 的第二个参数给出的BiFunction 应用于key和初始值:结果给出一个新值
- 新值替换lock中key的初始值
3.3. 解锁并可选择删除地图条目
此外,当线程释放锁时,我们将减少与LockWrapper关联的线程数。如果计数降至零,那么我们将从ConcurrentHashMap中删除键:
public void unlock(String key) {
LockWrapper lockWrapper = locks.get(key);
lockWrapper.lock.unlock();
if (lockWrapper.removeThreadFromQueue() == 0) {
// NB : We pass in the specific value to remove to handle the case where another thread would queue right before the removal
locks.remove(key, lockWrapper);
}
}
3.4. 概括
简而言之,让我们看看我们整个班级最终的样子:
public class LockByKey {
private static class LockWrapper {
private final Lock lock = new ReentrantLock();
private final AtomicInteger numberOfThreadsInQueue = new AtomicInteger(1);
private LockWrapper addThreadInQueue() {
numberOfThreadsInQueue.incrementAndGet();
return this;
}
private int removeThreadFromQueue() {
return numberOfThreadsInQueue.decrementAndGet();
}
}
private static ConcurrentHashMap<String, LockWrapper> locks = new ConcurrentHashMap<String, LockWrapper>();
public void lock(String key) {
LockWrapper lockWrapper = locks.compute(key, (k, v) -> v == null ? new LockWrapper() : v.addThreadInQueue());
lockWrapper.lock.lock();
}
public void unlock(String key) {
LockWrapper lockWrapper = locks.get(key);
lockWrapper.lock.unlock();
if (lockWrapper.removeThreadFromQueue() == 0) {
// NB : We pass in the specific value to remove to handle the case where another thread would queue right before the removal
locks.remove(key, lockWrapper);
}
}
}
用法和我们之前的很相似:
String key = "key";
LockByKey lockByKey = new LockByKey();
try {
lockByKey.lock(key);
// insert your code here
} finally { // CRUCIAL
lockByKey.unlock(key);
}
4.同时允许多个动作
最后但同样重要的是,让我们考虑另一种情况:我们希望将允许同时作用于同一个键的线程数限制为某个整数n,而不是一次只允许一个线程对给定键执行操作。为简单起见,我们将设置n = 2。
让我们广泛描述我们的用例:
- 第一个线程想要获取键上的锁:它将被允许这样做
- 第二个线程想要获取相同的锁:它也将被允许
- 第三个线程请求对同一个键的锁:它必须排队,直到前两个线程之一释放它的锁
信号量 就是为此而生的。semaphore是用于限制同时访问资源的线程数的对象。
全局功能和代码看起来与我们的锁非常相似:
public class SimultaneousEntriesLockByKey {
private static final int ALLOWED_THREADS = 2;
private static ConcurrentHashMap<String, Semaphore> semaphores = new ConcurrentHashMap<String, Semaphore>();
public void lock(String key) {
Semaphore semaphore = semaphores.compute(key, (k, v) -> v == null ? new Semaphore(ALLOWED_THREADS) : v);
semaphore.acquireUninterruptibly();
}
public void unlock(String key) {
Semaphore semaphore = semaphores.get(key);
semaphore.release();
if (semaphore.availablePermits() == ALLOWED_THREADS) {
semaphores.remove(key, semaphore);
}
}
}
用法相同:
String key = "key";
SimultaneousEntriesLockByKey lockByKey = new SimultaneousEntriesLockByKey();
try {
lockByKey.lock(key);
// insert your code here
} finally { // CRUCIAL
lockByKey.unlock(key);
}