Happens-Before 规则

Happens-Before的简单定义:前面一个操作的结果对于后续的操作是可见的,即前一个操作结果可以被后续操作获取

为什么需要happen-before:JVM会对代码进行编译优化,会出现指令重排的情况。为了避免编译优化对并发编程安全性的影响,需要happens-before规则定义一些禁止编译优化的场景,保证并发编程的正确性。

happens-before八条原则如下

  • 单线程happen-before原则:在同一个线程中,书写在前面的操作happen-before后面的操作。
  • 锁的happen-before原则:同一个锁的unlock操作happen-before此锁的lock操作。
  • volatile的happen-before原则: 对一个volatile变量的写操作happen-before对此变量的任意操作。
  • happen-before的传递性原则: 如果A操作 happen-before B操作,B操作happen-before C操作,那么A操作happen-before C操作。
  • 线程启动的happen-before原则:同一个线程的start方法happen-before此线程的其它方法。
  • 线程中断的happen-before原则:对线程interrupt方法的调用happen-before被中断线程的检测到中断发送的代码。
  • 线程终结的happen-before原则:线程中的所有操作都happen-before线程的终止检测。
  • 对象创建的happen-before原则:一个对象的初始化完成先于他的finalize方法调用。
class VolatileExample {
  int x = 0;
  volatile boolean v = false;
  public void writer() {
    x = 42;
    v = true;
  }
  public void reader() {
    if (v == true) {
      // 这里x会是多少呢?
      //输出42 因为happen-before
    }
  }
}

原子性问题

原子性问题的源头是线程切换,假如同一时刻只有一个线程能够执行即【互斥】,则能够解决原子性问题。

我们将需要互斥执行的代码称为临界区。线程在进入临界区之前,首先尝试加锁lock(),如果成功,则进入临界区,此时称为这个线程持有锁;否则等待,直到持有锁的线程解锁;持有锁的线程执行完临界区的代码之后,执行解锁unlock();synchronized也同样如此,不过是java编译器会默认加锁。

关于锁

synchronized默认加锁规则如下:

  • 当修饰静态方法时锁的是Class对象
  • 当修饰非静态方法时锁的是当前实例对象

保护有关联关系的多个资源

我们可以使用一把锁来管理保护多个资源例如:关于转账的取款、查看余额、修改密码、查看密码,但是这样四个操作都将是穿行的降低程序性能。因此用不同的锁来对收保护的资源进行精细化管理,能够提升性能。这种锁就叫细粒度锁。

如果多个资源有关联关系。例如例如银行业务里面的转账操作,账户A减少100元,账户B增加100元。这两个账户就是有关联关系的。那对于像转账这种有关联关系的操作,我们应该怎么去解决呢?先把这个问题代码化。我们声明了个账户类:Account,该类有一个成员变量余额:balance,还有一个用于转账的方法:transfer(),然后怎么保证转账操作transfer()没有并发问题呢?

class Account {
  private int balance;
  // 转账
  void transfer(
      Account target, int amt){
    if (this.balance > amt) {
      this.balance -= amt;
      target.balance += amt;
    }
  } 
}

错误示例:给transfer加synchronized。

错误原因:锁的是单个实例对象即转账对象不是参与转账的两方对象。问题就出在this这把锁上,this这把锁可以保护自己的余额this.balance,却保护不了别人的余额target.balance,线程1锁定的是账户A的实例(A.this),而线程2锁定的是账户B的实例(B.this),所以这两个线程可以同时进入临界区transfer()。同时进入临界区的结果是什么呢?线程1和线程2都会读到账户B的余额为200,导致最终账户B的余额可能是300(线程1后于线程2写B.balance,线程2写的B.balance值被线程1覆盖),可能是100(线程1先于线程2写B.balance,线程1写的B.balance值被线程2覆盖),就是不可能是200。

改进代码方案如下:即用两个不同的锁即细粒度锁同时锁住代码,但同时以下代码可能造成死锁的风险

class Account {
  private int balance;
  // 转账
  void transfer(Account target, int amt){
    // 锁定转出账户
    synchronized(this) {              
      // 锁定转入账户
      synchronized(target) {           
        if (this.balance > amt) {
          this.balance -= amt;
          target.balance += amt;
        }
      }
    }
  } 
}

关于死锁

造成死锁的四个条件:

  • 互斥,共享资源X和Y只能被一个线程占用;
  • 占有且等待,线程T1已经取得共享资源X,在等待共享资源Y的时候,不释放共享资源X;
  • 不可抢占,其他线程不能强行抢占线程T1占有的资源;
  • 循环等待,线程T1等待线程T2占有的资源,线程T2等待线程T1占有的资源,就是循环等待。

破坏死锁

我们只要破坏四个条件中的其中一个即可避免死锁

破坏占用且等待条件,即一次性申请所有资源,具体代码如下

class Allocator {
  private List<Object> als =
    new ArrayList<>();
  // 一次性申请所有资源
  synchronized boolean apply(
    Object from, Object to){
    if(als.contains(from) ||
         als.contains(to)){
      return false;  
    } else {
      als.add(from);
      als.add(to);  
    }
    return true;
  }
  // 归还资源
  synchronized void free(
    Object from, Object to){
    als.remove(from);
    als.remove(to);
  }
}

class Account {
  // actr应该为单例
  private Allocator actr;
  private int balance;
  // 转账
  void transfer(Account target, int amt){
    // 一次性申请转出账户和转入账户,直到成功
    while(!actr.apply(this, target))
      ;
    try{
      // 锁定转出账户
      synchronized(this){              
        // 锁定转入账户
        synchronized(target){           
          if (this.balance > amt){
            this.balance -= amt;
            target.balance += amt;
          }
        }
      }
    } finally {
      actr.free(this, target)
    }
  } 
}

破坏循环等待条件,具体代码如下

class Account {
  private int id;
  private int balance;
  // 转账
  void transfer(Account target, int amt){
    Account left = this        ①
    Account right = target;    ②
    if (this.id > target.id) { ③
      left = target;           ④
      right = this;            ⑤
    }                          ⑥
    // 锁定序号小的账户
    synchronized(left){
      // 锁定序号大的账户
      synchronized(right){ 
        if (this.balance > amt){
          this.balance -= amt;
          target.balance += amt;
        }
      }
    }
  } 
}

活锁

活锁指的是任务或者执行者没有被阻塞,由于某些条件没有满足,导致一直重复尝试,失败,尝试,失败。 活锁和死锁的区别在于,处于活锁的实体是在不断的改变状态,所谓的“活”, 而处于死锁的实体表现为等待;活锁有可能自行解开,死锁则不能。
为解决活锁可以引入一些随机性,例如如果检测到冲突,那么就暂停随机的一定时间进行重试。这回大大减少碰撞的可能性。

性能

  1. 使用锁会带来性能问题,最好的方案是使用无锁的算法和数据结构例如线程本地存储(ThreadLocal)、写时复制(cope-on-write)、乐观锁等。
  2. 减少锁的使用时间。互斥锁的本质上是将并行的程序串行化,所以要增加并行度。例如细粒度的锁(CurrentHashMap)使用分段锁、读写锁即读是无锁,只有写才会互斥。

补充性能三个度量指标:吞吐量、延迟和并发量

  1. 吞吐量:指的是单位时间内能处理的请求数量。吞吐量越高,说明性能越好。
  2. 延迟:指的是从发出请求到收到响应的时间。延迟越小,说明性能越好。
  3. 并发量:指的是能同时处理的请求数量,但一般来说随着并发量的增加、延迟也会增加。