05Lock下锁的实现介绍
Semaphore
信号量模型简单概括为:一个计数器,一个等待队列,三个方法。在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()、down()和up()。
- init():设置计数器的初始值。
- down():计数器的值减1;如果此时计数器的值小于0,则当前线程将被阻塞,否则当前线程可以继续执行。
- up():计数器的值加1;如果此时计数器的值小于或者等于0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。
备注:init()、down()和up()三个方法都是原子性的,并且这个原子性是由信号量模型的实现保证的。
代码化的信号量模型
class Semaphore{
// 计数器
int count;
// 等待队列
Queue queue;
// 初始化操作
Semaphore(int c){
this.count=c;
}
//
void down(){
this.count--;
if(this.count<0){
//将当前线程插入等待队列
//阻塞当前线程
}
}
void up(){
this.count++;
if(this.count<=0) {
//移除等待队列中的某个线程T
//唤醒线程T
}
}
}
使用信号量保证互斥示例如下:
static int count;
//初始化信号量
static final Semaphore s
= new Semaphore(1);
//用信号量保证互斥
static void addOne() {
s.acquire();
try {
count+=1;
} finally {
s.release();
}
}
Semaphore本质上就是一个信号计数器,用于限制同一时间的最大访问数量。
快速实现一个限流器
一个对象池的需求。所谓对象池呢,指的是一次性创建出N个对象,之后所有的线程重复利用这N个对象,当然对象在被释放前,也是不允许其他线程使用的。对象池,可以用List保存实例对象,这个很简单。但关键是限流器的设计,这里的限流,指的是不允许多于N个线程同时进入临界区。那如何快速实现一个这样的限流器呢?
实现代码如下:
import java.util.List;
import java.util.Vector;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import java.util.function.Supplier;
class ObjPool<T, R> {
final List<T> pool;
final Semaphore sem;
// 构造函数,使用 Supplier 传入对象生成器
ObjPool(int size, Supplier<T> supplier) {
pool = new Vector<>();
for (int i = 0; i < size; i++) {
pool.add(supplier.get());
}
sem = new Semaphore(size);
}
// 利用对象池的对象,调用 func
R exec(Function<T, R> func) throws InterruptedException {
T t = null;
sem.acquire();
try {
t = pool.remove(0);
return func.apply(t);
} finally {
pool.add(t);
sem.release();
}
}
}
// 使用示例
public class Main {
public static void main(String[] args) throws InterruptedException {
// 创建对象池,初始对象为 Long 类型,初始值为 2L
ObjPool<Long, String> pool = new ObjPool<>(10, () -> 2L);
// 通过对象池获取 t,之后执行
String result = pool.exec(t -> {
System.out.println(t);
return t.toString();
});
System.out.println("Result: " + result);
}
}
用Semaphore实现限流器。关键的代码是ObjPool里面的exec()方法,这个方法里面实现了限流的功能。在这个方法里面,我们首先调用acquire()方法(与之匹配的是在finally里面调用release()方法),假设对象池的大小是10,信号量的计数器初始化为10,那么前10个线程调用acquire()方法,都能继续执行,相当于通过了信号灯,而其他线程则会阻塞在acquire()方法上。对于通过信号灯的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过pool.remove(0)实现的),分配完之后会执行一个回调函数func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过pool.add(t)实现的),同时调用release()方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于0,那么说明有线程在等待,此时会自动唤醒等待的线程。
ReadWriteLock
高并发场景下重复查询数据的问题
class Cache<K,V> {
final Map<K, V> m =
new HashMap<>();
final ReadWriteLock rwl =
new ReentrantReadWriteLock();
final Lock r = rwl.readLock();
final Lock w = rwl.writeLock();
V get(K key) {
V v = null;
//读缓存
r.lock(); ①
try {
v = m.get(key); ②
} finally{
r.unlock(); ③
}
//缓存中存在,返回
if(v != null) { ④
return v;
}
//缓存中不存在,查询数据库
w.lock(); ⑤
try {
//再次验证
//其他线程可能已经查询过数据库
v = m.get(key); ⑥
if(v == null){ ⑦
//查询数据库
v=省略代码无数
m.put(key, v);
}
} finally{
w.unlock();
}
return v;
}
}
在代码⑥⑦处,重新验证了一次缓存中是否存在,再次验证如果还是不存在,我们才去查询数据库并更新本地缓存。
原因是在高并发的场景下,有可能会有多线程竞争写锁。假设缓存是空的,没有缓存任何东西,如果此时有三个线程T1、T2和T3同时调用get()方法,并且参数key也是相同的。那么它们会同时执行到代码⑤处,但此时只有一个线程能够获得写锁,假设是线程T1,线程T1获取写锁之后查询数据库并更新缓存,最终释放写锁。此时线程T2和T3会再有一个线程能够获取写锁,假设是T2,如果不采用再次验证的方式,此时T2会再次查询数据库。T2释放写锁之后,T3也会再次查询一次数据库。而实际上线程T1已经把缓存的值设置好了,T2、T3完全没有必要再次查询数据库。所以,再次验证的方式,能够避免高并发场景下重复查询数据的问题。
读写锁类似于ReentrantLock,也支持公平模式和非公平模式。读锁和写锁都实现了 java.util.concurrent.locks.Lock接口,所以除了支持lock()方法外,tryLock()、lockInterruptibly() 等方法也都是支持的。但是有一点需要注意,那就是只有写锁支持条件变量,读锁是不支持条件变量的,读锁调用newCondition()会抛出UnsupportedOperationException异常。
ReadWriteLock如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。
StampedLock
StampedLock支持三种模式,分别是:写锁、悲观读锁和乐观读。其中,写锁、悲观读锁的语义和ReadWriteLock的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的。不同的是:StampedLock里的写锁和悲观读锁加锁成功之后,都会返回一个stamp;然后解锁的时候,需要传入这个stamp。StampedLock不支持重入,另外,StampedLock的悲观读锁、写锁都不支持条件变量
StampedLock的性能之所以比ReadWriteLock还要好,其关键是StampedLock支持乐观读(该操作无锁)的方式。ReadWriteLock支持多个线程同时读,但是当多个线程同时读的时候,所有的写操作会被阻塞;而StampedLock提供的乐观读,是允许一个线程获取写锁的,也就是说不是所有的写操作都被阻塞。
class Point {
private int x, y;
final StampedLock sl =
new StampedLock();
//计算到原点的距离
int distanceFromOrigin() {
// 乐观读
long stamp =
sl.tryOptimisticRead();
// 读入局部变量,
// 读的过程数据可能被修改
int curX = x, curY = y;
//判断执行读操作期间,
//是否存在写操作,如果存在,
//则sl.validate返回false
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
curX = x;
curY = y;
} finally {
//释放悲观读锁
sl.unlockRead(stamp);
}
}
return Math.sqrt(
curX * curX + curY * curY);
}
}
在distanceFromOrigin()这个方法中,首先通过调用tryOptimisticRead()获取了一个stamp,这里的tryOptimisticRead()就是前面提到的乐观读。之后将共享变量x和y读入方法的局部变量中,不过需要注意的是,由于tryOptimisticRead()是无锁的,所以共享变量x和y读入方法局部变量时,x和y有可能被其他线程修改了。因此最后读完之后,还需要再次验证一下是否存在写操作,这个验证操作是通过调用validate(stamp)来实现的。
如果线程阻塞在StampedLock的readLock()或者writeLock()上时,此时调用该阻塞线程的interrupt()方法,会导致CPU飙升,所以StampedLock一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁readLockInterruptibly()和写锁writeLockInterruptibly()
比较与总结
锁 | 并发度 | 条件变量 |
---|---|---|
ReentrantLock | 单个线程进入 | 支持 |
ReadWriteLock | 多个线程进入读锁,单个线程进入写锁(写时不可读) | 读锁不支持 |
StampedLock | 多个线程进入读锁,单个线程进入写锁(乐观读时可写) | 悲观读锁、写锁都不支持条件变量 |
Semaphore | 允许多个线程同时进入 |