Semaphore

信号量模型简单概括为:一个计数器,一个等待队列,三个方法。在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()、down()和up()。

  • init():设置计数器的初始值。
  • down():计数器的值减1;如果此时计数器的值小于0,则当前线程将被阻塞,否则当前线程可以继续执行。
  • up():计数器的值加1;如果此时计数器的值小于或者等于0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。

备注:init()、down()和up()三个方法都是原子性的,并且这个原子性是由信号量模型的实现保证的。

代码化的信号量模型

class Semaphore{
  // 计数器
  int count;
  // 等待队列
  Queue queue;
  // 初始化操作
  Semaphore(int c){
    this.count=c;
  }
  // 
  void down(){
    this.count--;
    if(this.count<0){
      //将当前线程插入等待队列
      //阻塞当前线程
    }
  }
  void up(){
    this.count++;
    if(this.count<=0) {
      //移除等待队列中的某个线程T
      //唤醒线程T
    }
  }
}

使用信号量保证互斥示例如下:

static int count;
//初始化信号量
static final Semaphore s 
    = new Semaphore(1);
//用信号量保证互斥    
static void addOne() {
  s.acquire();
  try {
    count+=1;
  } finally {
    s.release();
  }
}

Semaphore本质上就是一个信号计数器,用于限制同一时间的最大访问数量。

快速实现一个限流器

一个对象池的需求。所谓对象池呢,指的是一次性创建出N个对象,之后所有的线程重复利用这N个对象,当然对象在被释放前,也是不允许其他线程使用的。对象池,可以用List保存实例对象,这个很简单。但关键是限流器的设计,这里的限流,指的是不允许多于N个线程同时进入临界区。那如何快速实现一个这样的限流器呢?

实现代码如下:

import java.util.List;
import java.util.Vector;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import java.util.function.Supplier;

class ObjPool<T, R> {
    final List<T> pool;
    final Semaphore sem;

    // 构造函数,使用 Supplier 传入对象生成器
    ObjPool(int size, Supplier<T> supplier) {
        pool = new Vector<>();
        for (int i = 0; i < size; i++) {
            pool.add(supplier.get());
        }
        sem = new Semaphore(size);
    }

    // 利用对象池的对象,调用 func
    R exec(Function<T, R> func) throws InterruptedException {
        T t = null;
        sem.acquire();
        try {
            t = pool.remove(0);
            return func.apply(t);
        } finally {
            pool.add(t);
            sem.release();
        }
    }
}

// 使用示例
public class Main {
    public static void main(String[] args) throws InterruptedException {
        // 创建对象池,初始对象为 Long 类型,初始值为 2L
        ObjPool<Long, String> pool = new ObjPool<>(10, () -> 2L);
        
        // 通过对象池获取 t,之后执行
        String result = pool.exec(t -> {
            System.out.println(t);
            return t.toString();
        });
        
        System.out.println("Result: " + result);
    }
}

用Semaphore实现限流器。关键的代码是ObjPool里面的exec()方法,这个方法里面实现了限流的功能。在这个方法里面,我们首先调用acquire()方法(与之匹配的是在finally里面调用release()方法),假设对象池的大小是10,信号量的计数器初始化为10,那么前10个线程调用acquire()方法,都能继续执行,相当于通过了信号灯,而其他线程则会阻塞在acquire()方法上。对于通过信号灯的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过pool.remove(0)实现的),分配完之后会执行一个回调函数func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过pool.add(t)实现的),同时调用release()方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于0,那么说明有线程在等待,此时会自动唤醒等待的线程。

ReadWriteLock

高并发场景下重复查询数据的问题
class Cache<K,V> {
  final Map<K, V> m =
    new HashMap<>();
  final ReadWriteLock rwl = 
    new ReentrantReadWriteLock();
  final Lock r = rwl.readLock();
  final Lock w = rwl.writeLock();
 
  V get(K key) {
    V v = null;
    //读缓存
    r.lock();         ①
    try {
      v = m.get(key); ②
    } finally{
      r.unlock();     ③
    }
    //缓存中存在,返回
    if(v != null) {   ④
      return v;
    }  
    //缓存中不存在,查询数据库
    w.lock();         ⑤
    try {
      //再次验证
      //其他线程可能已经查询过数据库
      v = m.get(key); ⑥
      if(v == null){  ⑦
        //查询数据库
        v=省略代码无数
        m.put(key, v);
      }
    } finally{
      w.unlock();
    }
    return v; 
  }
}

在代码⑥⑦处,重新验证了一次缓存中是否存在,再次验证如果还是不存在,我们才去查询数据库并更新本地缓存。

原因是在高并发的场景下,有可能会有多线程竞争写锁。假设缓存是空的,没有缓存任何东西,如果此时有三个线程T1、T2和T3同时调用get()方法,并且参数key也是相同的。那么它们会同时执行到代码⑤处,但此时只有一个线程能够获得写锁,假设是线程T1,线程T1获取写锁之后查询数据库并更新缓存,最终释放写锁。此时线程T2和T3会再有一个线程能够获取写锁,假设是T2,如果不采用再次验证的方式,此时T2会再次查询数据库。T2释放写锁之后,T3也会再次查询一次数据库。而实际上线程T1已经把缓存的值设置好了,T2、T3完全没有必要再次查询数据库。所以,再次验证的方式,能够避免高并发场景下重复查询数据的问题。

读写锁类似于ReentrantLock,也支持公平模式和非公平模式。读锁和写锁都实现了 java.util.concurrent.locks.Lock接口,所以除了支持lock()方法外,tryLock()、lockInterruptibly() 等方法也都是支持的。但是有一点需要注意,那就是只有写锁支持条件变量,读锁是不支持条件变量的,读锁调用newCondition()会抛出UnsupportedOperationException异常。

ReadWriteLock如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。

StampedLock

StampedLock支持三种模式,分别是:写锁、悲观读锁和乐观读。其中,写锁、悲观读锁的语义和ReadWriteLock的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的。不同的是:StampedLock里的写锁和悲观读锁加锁成功之后,都会返回一个stamp;然后解锁的时候,需要传入这个stamp。StampedLock不支持重入另外,StampedLock的悲观读锁、写锁都不支持条件变量

StampedLock的性能之所以比ReadWriteLock还要好,其关键是StampedLock支持乐观读(该操作无锁)的方式。ReadWriteLock支持多个线程同时读,但是当多个线程同时读的时候,所有的写操作会被阻塞;而StampedLock提供的乐观读,是允许一个线程获取写锁的,也就是说不是所有的写操作都被阻塞。

class Point {
  private int x, y;
  final StampedLock sl = 
    new StampedLock();
  //计算到原点的距离  
  int distanceFromOrigin() {
    // 乐观读
    long stamp = 
      sl.tryOptimisticRead();
    // 读入局部变量,
    // 读的过程数据可能被修改
    int curX = x, curY = y;
    //判断执行读操作期间,
    //是否存在写操作,如果存在,
    //则sl.validate返回false
    if (!sl.validate(stamp)){
      // 升级为悲观读锁
      stamp = sl.readLock();
      try {
        curX = x;
        curY = y;
      } finally {
        //释放悲观读锁
        sl.unlockRead(stamp);
      }
    }
    return Math.sqrt(
      curX * curX + curY * curY);
  }
}

在distanceFromOrigin()这个方法中,首先通过调用tryOptimisticRead()获取了一个stamp,这里的tryOptimisticRead()就是前面提到的乐观读。之后将共享变量x和y读入方法的局部变量中,不过需要注意的是,由于tryOptimisticRead()是无锁的,所以共享变量x和y读入方法局部变量时,x和y有可能被其他线程修改了。因此最后读完之后,还需要再次验证一下是否存在写操作,这个验证操作是通过调用validate(stamp)来实现的。

如果线程阻塞在StampedLock的readLock()或者writeLock()上时,此时调用该阻塞线程的interrupt()方法,会导致CPU飙升,所以StampedLock一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁readLockInterruptibly()和写锁writeLockInterruptibly()

比较与总结

并发度条件变量
ReentrantLock单个线程进入支持
ReadWriteLock多个线程进入读锁,单个线程进入写锁(写时不可读)读锁不支持
StampedLock多个线程进入读锁,单个线程进入写锁(乐观读时可写)悲观读锁、写锁都不支持条件变量
Semaphore允许多个线程同时进入