01.Java并发编程
# 管程
Java 中提供的并发包都是以管程技术为基础的,管程就是一把解决并发问题的万能钥匙
Java 采用的管程技术在哪里体现了呢?synchronized 关键字以及 wait()、notify()、notifyAll() 都是管程的组成部分
管程解决互斥问题的思路:将共享变量以及对共享变量的操作统一封装起来
# synchronized如何保证同步呢?
同步操作主要由两个 jvm 指令实现:monitorenter、monitorexit
对于下边代码:
public class LockMain {
public synchronized void insert() {
System.out.println("synchronized 方法");
}
public void select() {
synchronized (this) {
System.out.println("synchronized 块");
}
}
}
在该类所在的路径,打开命令行执行:
# 先编译成字节码
javac .\LockMain.java
# 再通过 javap 指令反编译出来 JVM 指令
# -v 可以输出更多详细信息
javap -v .\LockMain.class
反编译后,两个方法的 JVM 指令如下:
public synchronized void insert();
descriptor: ()V
flags: ACC_PUBLIC, ACC_SYNCHRONIZED
Code:
0: getstatic #7 // Field java/lang/System.out:Ljava/io/PrintStream;
3: ldc #13 // String synchronized 方法
5: invokevirtual #15 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
8: return
public void select();
descriptor: ()V
flags: ACC_PUBLIC
Code:
0: aload_0
1: dup
2: astore_1
3: monitorenter // monitorenter 指令进入同步代码块
4: getstatic #7 // Field java/lang/System.out:Ljava/io/PrintStream;
7: ldc #21 // String synchronized 块
9: invokevirtual #15 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
12: aload_1
13: monitorexit // monitorexit 指令退出同步代码块
14: goto 22
17: astore_2
18: aload_1
19: monitorexit // monitorexit 指令退出同步代码块
20: aload_2
21: athrow
22: return
synchronized 加在方法上,可以看到 insert 方法的 flags 有一个
ACC_SYNCHRONIZED
关键字,那么 JVM 进行方法调用时,发现该关键字,就会先获取锁,再执行方法,底层也是基于 monitorenter 和 monitorexit 实现的synchronized 加在代码块上,有一个 monitorenter 对应了两个 monitorexit,这是因为编译器会为同步代码块添加一个隐式的
try-finally
,在 finally 中也会调用 monitorexit 释放锁
# 面试杀手锏:全面剖析 synchronized 锁升级流程
在准备面试时,一定要准备杀手锏,也就是你了解最深的几个内容,这样才能给面试官很深的印象
而 Java 并发相关的内容拿来做杀手锏非常合适,因为并发是基础性的内容,大家都学过,但是大家学的深度又参差不齐,因此通过并发相关内容就可以看出来面试者的水平高低
因此并发相关的内容一定要深入原理,好好学习一下!
JDK1.6 之前,synchronized 使用重量级锁,性能开销很高
JDK1.6 引入了锁的优化:偏向锁和轻量级锁
同步锁共有 4 个状态:无锁、偏向锁、轻量级锁、重量级锁
,这 4 个状态会随着竞争激烈而逐渐升级
synchronized 的锁的状态是记录在对象的 Mark Word 中的,因此这里需要先介绍一些 Mark Word
# 对象头 Mark Word
在 64 位 JVM 中,Mark Word 的长度为 64 位
- Mark Word 到底是 Java 中对象的哪一部分呢?
在 Java 中,每个对象在堆空间中存储是分为了 3 个部分:对象头、实例数据、对齐填充,对象头中就是存储一些对象的标识信息
而 Mark Word 就是在对象头中进行存储
Mark Word 的作用就是表示对象的线程锁的状态,并且存放对象的 hashCode
我们上边说了 Mark Word 是 64 位的,接下来看一下 Mark Word 如何分配这 64 位的空间
如下图,Mark Word 在以下共 5 种状态下的组成是不一样的,这里来介绍一下:
- 无锁时的 Mark Word
在对象没有锁的时候,我们可以看到 Mark Word 中,前 25bit 没有用到
接下来的 29bit 存储了对象的 HashCode,以及使用 4bit 存储了对象的分代年龄
1bit 存储非偏向锁状态为 0
2bit 存储锁标志位 01,这是固定的
- 有偏向锁的 Mark Word
当对象 被偏向锁锁住
的时候,会使用前 54bit 指向了持有该对象锁的线程,Epoch 代表了偏向锁的版本戳
偏向锁的标志被设置为 1,锁标志位仍然为 01
- 有轻量级锁的 Mark Word
当对象 被轻量级锁锁住
的时候,Mark Word 中的组成又发生了变化
前 62bit 指向了线程栈帧中的 LockRecord
当一个线程持有当前对象锁,并且是轻量级锁的时候,会在线程栈中该线程的栈帧里创建一个 LockRecord 对象(下边在升级轻量级锁的时候会细讲)
你可以先简单理解为这个 LockRecord 就是线程持有该轻量级锁的一个标志
最后 2bit 的锁标志改为 00,表示轻量级锁
- 有重量级锁的 Mark Word
当对象被 重量级锁锁住
的时候,前 62bit 会指向重量级锁,下边升级重量级锁会介绍
最后 2bit 的锁标志改为 10,表示重量级锁
- 有 GC 标志的 Mark Word
这里了解一下就好了,被 GC 标记过的 Mark Word 就会将锁标志位改为 11
有了 Mark Word 的基础,接下来就可以学习 synchronized 的锁升级过程了!
# 偏向锁
当一个线程第一次竞争到锁,则拿到的就是偏向锁,此时不存在其他线程的竞争,因此偏向锁的性能是很高的,他会偏向第一个访问锁的线程
偏向锁的获取就是通过 CAS 操作将锁对象的 MarkWord 中的 ThreadId 修改为当前线程 ID
当持有偏向锁的线程再来访问的话,可以直接访问,不需要触发同步,连 CAS 操作都不需要
为什么要引入偏向锁?
偏向锁是 HotSpot 虚拟机的一项优化技术,可以提升单线程对同步代码块的访问性能
受益于偏向锁的应用程序往往是使用了早期 Java 集合的程序(JDK1.1),即 HashTable、Vector,在每次访问的时候都是 线程同步操作
而之后,出现了新的高性能并发数据结构 ConcurrentHashMap,使用偏向锁带来的性能提升就不明显了
Java 团队更推荐使用 轻量级锁
或者 重量级锁
:
- 如果竞争不激烈的话,并且每个线程对锁持有时间较短的情况下,可以使用轻量级锁,也就是 CAS 自旋等待获取锁
- 如果竞争激烈的情况下,或者每个线程持有锁的时间很长,如果还是用 CAS 自旋会导致大量线程在空转,大量占用 CPU 资源,因此要使用重量级锁
为什么在 JDK15 之后就将偏向锁废弃掉了呢?(偏向锁撤销性能差)
偏向锁在 JDK 15 之后就被废弃掉了,上边已经说了,偏向锁在之后的 JDK 中带来性能提升就不明显了,主要原因其实还是 偏向锁的撤销
性能较差
偏向锁只会出现在只有一个线程访问同步代码块时,只要此时有其他线程来访问,偏向锁就会撤销,进而升级为轻量级锁
那么在 高并发
场景下,基本上不会出现只有一个线程访问同步代码块的情况
因此会出现 偏向锁撤销
的情况,而偏向锁撤销需要等待进入全局安全点(safepoint)时,才会撤销,在 safepoint 时,所有的线程都暂停工作,因此偏向锁撤销的性能很差
# 轻量级锁
如果有线程竞争的话,偏向锁升级为轻量级锁
轻量级锁是通过 CAS 去自旋
获取锁,适用于并发竞争不激烈,并且持锁时间较短的情况
当一个线程获取锁之后,其他线程只能自旋等待,不会阻塞,自选等待也就是空转占用 CPU,因此如果自旋时间很长,可以想象到对 CPU 很不友好
接下来说一下线程竞争轻量级锁的过程
当进入到 synchronized 代码块中,虚拟机会现在当前线程的栈帧中创建一个 Lock Record
的空间,用于存储当前锁对象的 Mark Word 拷贝,官方称这个拷贝的 Mark Word 为 Displaced Mark Word
如果当前线程 开始抢占
该锁,那么会先将锁对象的 Mark Word 复制到当前线程栈帧的 Lock Record 中去, 之后再通过 CAS 操作尝试将锁对象的 Lock Record 指针指向当前线程栈帧中的 Lock Record ,并且将栈帧中的 owner 指针指向对锁对象的 Mark Word
如果通过 CAS 操作更新成功了,就说明该线程抢占到了该锁,将锁对象 Mark Word 的锁标志设置为 00,表示是轻量级锁
如果通过 CAS 操作更新失败,会检查 Mark Word 的 Lock Record 指针是否指向当前线程的栈帧,如果是的话,表明当前线程抢到了锁,可以直接进入,否则会自旋等待
这里说一下为什么需要设计 Lock Record 这个对象再去存储锁对象的 Mark Word
其实是 为了存储锁对象的 HashCode
,对象在无锁状态下 HashCode 会存储在 Mark Word 中
但是当锁状态升级为偏向锁之后,原本存储 HashCode 的位置需要存储 持有锁的线程信息
,因此这也是偏向锁不能和 HashCode 同时存在的原因
因此,为了保存对象的 HashCode 信息,在对象上的锁撤销之后,恢复到 无锁状态时
,可以再将对象的 HashCode 信息给写到 Mark Word 上,设计出来了 Lock Record
在给对象加轻量级锁的时候,先将对象的 Mark Word 复制到线程的 Lock Record 中,此时 Lock Record 中保存了对象的 HashCode
在对象上的锁被释放之后,再将 Lock Record 中的 HashCode 赋值给对象的 Mark Word 即可
- 这里说一下自适应自旋:
自旋锁在 JDK 1.4 中就已经引入了,在之前的版本中,自旋次数默认是 10 次(一般认为 10 次自旋大概等于线程挂起的开销)
在 JDK1.6 之后对自旋锁进一步优化,进入了 自适应自旋
自适应自旋: 自旋的次数不再是一个固定的值了,而是由前一次在该锁上的 自旋时间
及 锁的拥有者的状态
来决定的
- 如果在当前锁对象上,上次就自旋成功获取锁了,那么当前线程来获取锁时,就会认为自选成功的概率比较大,因此允许自旋相对更长的时间来获取锁
- 如果在当前锁对象上,很少通过自旋获取锁,那么之后在获取锁的时候,可能就直接跳过自旋了
# 重量级锁
当 CAS 自旋达到一定次数,就会升级为重量级锁,避免长时间的 CAS 资源耗费 CPU 性能
在重量级锁中,当线程发现锁已经被占用了,就会将自己挂起,而不是一直占用 CPU 进行空转
而重量级锁的 性能不高的原因
就是因为要不断挂起、唤醒线程,进行线程状态的变更, Hostspot 虚拟机采用 内核线程
实现线程模型,也就是说 Java 线程都是直接映射到操作系统线程的,因此线程相关的操作都需要在内核态由操作系统执行,导致了用户态切和内核态之间的切换,因此重量级锁性能不高!
- 先说一下 Monitor 的概念:
每个 Java 对象都与一个 Monitor 相关联,Monitor 的主要目的是确保在任何给定时间,只有一个线程能够执行与特定对象相关联的临界区代码。Monitor 是通过对象头(Object Header)和内置锁(Intrinsic Lock)来实现的,在 JVM 中 Monitor 的具体实现是 ObjectMonitor
- 接下来说一下,重量级锁的抢占流程(了解一下整体的流程就行,这点的代码都已经是 JVM 层面的了,比较底层):
1、当线程准备要抢占重量级锁时,会创建一个 ObjectMonitor (JVM 代码中的 ObjectMonitor)的对象,里边有两个队列 EntryList、WaitSet
2、如果该锁正在被其他线程使用,当前线程会先进入到 EntryList 队列中
3、当重量级锁被释放之后,JVM 会指定 EntryList 队列头部的第一个线程为 OnDeck Thread,也就是准备拿到锁的线程
4、如果持有锁的线程被 Object.wait() 方法阻塞,就会转移到 WaitSet 队列,等待被 notify() 或 notifyAll() 唤醒之后进入到 EntryList 队列中
这里说一下重量级锁的抢占是 非公平锁
,因为线程来抢占重量级锁之前都会先通过 CAS 自选获取锁,如果获取不到了才会进入到队列等待获取重量级锁,因此这对于队列中的线程是不公平的!
当获取了重量级锁之后,就会将锁对象中的 锁指针
指向 ObjectMonitor 对象
# 总结
最后总结一下
无锁升级到偏向锁
比较简单,将对象的 Mark Word 指向当前线程即可
一旦发生多线程竞争锁,就会升级到 轻量级锁
,这个抢占过程是通过 CAS 来完成的,主要是通过 CAS 更新锁对象的 Mark Word 值,更新成功说明抢占到了轻量级锁
如果 CAS 达到一定次数,会升级到 重量级锁
,这个过程是多个线程在 ObjectMonitor 的阻塞队列中进行排队的
# synchronized 深入剖析
# 面试官:synchronized 关键字可以保证可见性吗?
synchronized 是可以保证可见性的
先介绍一下可见性是什么东西:
线程要修改一个变量,这个变量是在主内存中存储的,线程修改时,要先去主内存读取一份到自己的工作内存中,这个工作内存是线程私有的,其他线程看不到,因此如果当前线程修改完毕,没有及时刷新到主内存,或者其他线程读取的时候,没有及时去主内存中读取最新值,就会导致出现数据的不一致问题,也就是数据的不可见,那么保证数据的可见性,就是要保证多个线程中的数据一致性,避免其中一个线程修改变量之后,其他线程看不见变量的更新!
接下来说一下可见性的保证:
先从底层说起,可见性的保证,在底层其实是通过 MESI 协议来保证的,也就是保证多个处理器(CPU)和主内存之间的数据一致性,从而保证在操作系统层面上,多个线程之间对数据的更新是可见的
这里说一下 MESI 如何保证数据一致性,只简单说一下,毕竟我们不是专攻底层的
在 MESI 协议中,主要有 两个关键机制
来保证数据的一致性:flush 和 refresh
- flush
将自己更新的值刷新到高速缓存里去,让其他处理器在后续可以通过一些机制从自己的高速缓存里读到更新后的值
并且还会给其他处理器发送一个 flush 消息,让其他处理器将对应的缓存行标记为无效,确保其他处理器不会读到这个变量的过时版本
- refresh
处理器中的线程在读取一个变量的值的时候,如果发现其他处理器的线程更新了变量的值,必须从其他处理器的高速缓存(或者是主内存)里,读取这个最新的值,更新到自己的高速缓存中
上边是硬件级别保证可见性的原理,那么在上层保证可见性其实是基于内存屏障来做的,内存屏障的作用可以理解为强制去读取最新值以及将最新值刷回主内存,也就是有内存屏障的地方会强制线程去执行 refresh 和 flush 动作,从而保证数据的一致性
synchronized 保证可见性:
那么 synchronized 保证可见性其实也就是通过 内存屏障
来保证的,在进入 synchronized 代码块和退出的时候,都会插入内存屏障,目的就是保证在进入的时候,强制去执行 refresh 操作,这样可以保证读取到变量的最新值,而在退出 synchronized 代码块时,也就是对变量的修改已经完成了,此使强制去执行 flush 操作,可以保证将变量的最新值给刷新到主内存中去
如下,在 synchronized 中添加的内存屏障:
int b = 0;
int c = 0;
synchronized (this) { --> monitorenter
--> Load 内存屏障
--> Acquire 内存屏障
int a = b;
c = 1;
--> Release 内存屏障
} --> monitorexit
--> Store 内存屏障
Acquire 屏障 = LoadLoad + LoadStore
- Acquire 屏障确保一个线程在执行到屏障之后的内存操作之前,能看到其他线程在屏障之前的所有内存操作的结果
Release 屏障 = LoadStore + StoreStore
- Release 屏障用于确保一个线程在执行到屏障之后的内存操作之前,其他线程能看到该线程在屏障之前的所有内存操作的结果
这里再介绍一下 JVM 中的内存屏障,也不用都背会,了解内存屏障这个东西就可以了,背会其实没有意义
JMM 中有 4 类内存屏障
:(Load 操作是从主内存加载数据,Store 操作是将数据刷新到主内存)
LoadLoad
:确保该内存屏障前的 Load 操作先于屏障后的所有 Load 操作。对于屏障前后的 Store 操作并无影响屏障类型StoreStore
:确保该内存屏障前的 Store 操作先于屏障后的所有 Store 操作。对于屏障前后的Load操作并无影响LoadStore
:确保屏障指令之前的所有Load操作,先于屏障之后所有 Store 操作StoreLoad
:确保屏障之前的所有内存访问操作(包括Store和Load)完成之后,才执行屏障之后的内存访问操作。全能型屏障,会屏蔽屏障前后所有指令的重排
# 为什么不建议在高并发场景下使用 synchronized?
这首先我们要了解 高并发场景的特点
以及 synchronized 底层加锁的原理
是怎样的!
首先说一下 synchronized 底层加锁的原理:
synchronized 在 JDK1.6 之后引入了锁的优化,随着多线程竞争的激烈程度不同,使用的锁也不同
当没有线程竞争,此时为
无锁
状态如果只有一个线程不停访问同步代码块,此时会使用
偏向锁
如果有两个以上线程并发访问,偏向锁会撤销,并升级为
轻量级锁
(偏向锁在 JDK15 之后就被废弃了,因为撤销带来性能开销比较大)如果在轻量锁 CAS 自旋达到一定次数还没有拿到锁,就会撤销轻量级锁,升级为
重量级锁
,其实重量级锁的开销是比较大的,因为底层涉及到
在高并发场景下,并发度肯定是比较高的,不建议使用 synchronized 的原因主要有以下几点:
- 由于并发度比较高,因此 synchronized 一定会升级到重量级锁,但是重量级锁的性能是不太高的,因为线程要阻塞再唤醒,需要用户态和内核态之间切换
- synchronized 没有读写锁优化
- synchronized 不能对线程唤醒,也就是你线程如果获取不到锁的话会一直阻塞
在使用 synchronized 的时候,一定要 直接将偏向锁给禁掉 ,因为大多数情况下,偏向锁都需要撤销升级为轻量级锁,而偏向锁的撤销性能是比较差的!
所以如果优化的话,对于第一个点来说,将等待线程阻塞再唤醒,个人感觉优化空间不大
第二个点就是读写锁的优化,读读之间不互斥,大幅度增强 读多写少
场景下的性能!
第三个点就是需要一个 tryLock(timeout)
功能,在指定时间获取不到锁的时候,可以直接将线程超时了,不去拿锁了
- 为什么说需要
tryLock(timeout)
这个功能呢?
假设这样一种场景,有一个任务在某个时间点可能多个线程同时要来执行,但是只要有一个线程执行完毕之后,其他线程就不需要执行了
那么假设在这个需要执行任务的时间点,大量线程同时过来执行,也就是大量线程都进入阻塞队列等待获取锁,第一个线程拿到锁执行任务之后,此时后边的线程都不需要执行该任务了,但是由于没有这个超时功能,导致后边的线程还需要在队列中等待获取锁,再一个个进入同步代码块,发现任务已经执行过了,不需要自己再执行了,之后再退出同步代码块
因此这个 tryLock(timeout)
的作用就是 将大量线程的串行操作转为并行操作 ,大量线程发现指定时间内获取不了锁了,直接超时,不获取锁了,这样后边的线程再来看就发现任务已经执行过了,不需要再去获取锁执行任务了
这里 tryLock(timeout)
的情况只是举一个特殊的情况,其实是参考了分布式环境下,更新 Redis 缓存时会出现这种情况,但是在分布式环境下肯定不会使用 synchronized ,因此这里主要是举个例子说一下 tryLock 的作用!
上边主要说了 synchronized 的缺点,一方面是为了应对面试,另一方面也可以通过各种问题来引发自己的思考,让自己对 synchronized 的理解更加深入
一般在写项目使用分布式锁还是多一些,毕竟高并发项目肯定不会使用单节点部署
而单机项目的话,一般也不会追求极致的性能,使用 synchronized 也没有什么问题
# volatile
synchronized 在多线程场景下存在性能问题
而 volatile
关键字是一个更轻量级的线程安全解决方案
volatile 关键字的作用:保证多线程场景下变量的可见性和有序性
- 可见性:保证此变量的修改对所有线程的可见性。
- 有序性:禁止指令重排序优化,编译器和处理器在进行指令优化时,不能把在 volatile 变量操作(读/写)后面的语句放到其前面执行,也不能将volatile变量操作前面的语句放在其后执行。遵循了JMM 的 happens-before 规则
线程写 volatile 变量的过程:
- 改变线程本地内存中volatile变量副本的值;
- 将改变后的副本的值从本地内存刷新到主内存
线程读 volatile 变量的过程:
- 从主内存中读取volatile变量的最新值到线程的本地内存中
- 从本地内存中读取volatile变量的副本
# volatile 实现原理
如果面试中问到了 volatile 关键字,应该从 Java 内存模型开始讲解,再说到原子性、可见性、有序性是什么
之后说 volatile 解决了有序性
和可见性
,但是并不解决原子性
volatile 可以说是 Java 虚拟机提供的最轻量级的同步机制,在很多开源框架中,都会大量使用 volatile 保证并发下的有序性和可见性
volatile 实现
可见性
和有序性
就是基于内存屏障
的:
内存屏障是一种 CPU 指令
,用于控制特定条件下的重排序和内存可见性问题
- 写操作时,在写指令后边加上 store 屏障指令,让线程本地内存的变量能立即刷到主内存中
- 读操作时,在读指令前边加上 load 屏障指令,可以及时读取到主内存中的值
JMM 中有 4 类内存屏障
:(Load 操作是从主内存加载数据,Store 操作是将数据刷新到主内存)
LoadLoad:确保该内存屏障前的 Load 操作先于屏障后的所有 Load 操作。对于屏障前后的 Store 操作并无影响屏障类型
StoreStore:确保该内存屏障前的 Store 操作先于屏障后的所有 Store 操作。对于屏障前后的Load操作并无影响
LoadStore:确保屏障指令之前的所有Load操作,先于屏障之后所有 Store 操作
StoreLoad:确保屏障之前的所有内存访问操作(包括Store和Load)完成之后,才执行屏障之后的内存访问操作。全能型屏障,会屏蔽屏障前后所有指令的重排
在字节码层面上,变量添加 volatile 之后,读取和写入该变量都会加入内存屏障:
读取 volatile 变量时,在后边添加内存屏障,不允许之后的操作重排序到读操作之前
volatile变量读操作
LoadLoad
LoadStore
写入 volatile 变量时,前后加入内存屏障,不允许写操作的前后操作重排序
LoadStore
StoreStore
volatile变量写操作
StoreLoad
volatile 的缺陷就是不能保证变量的原子性
解决方案:可以通过加锁或者 AtomicInteger
原子操作类来保证该变量操作时的原子性
public static AtomicInteger count = new AtomicInteger(0);
# CAS
同步组件中大量使用 CAS 技术实现了 Java 多线程的并发操作。整个 AQS 同步组件、Atomic 原子类操作等等都是以 CAS 实现的
Java 中 ConcurrentHashMap 在 jdk1.8 的版本中也调整为了 CAS+Synchronized。可以说 CAS 是整个 JUC 的基石
CAS 操作主要涉及 3 个操作数:
- V:要写的内存地址
- E:预期值
- N:新写入的值
当内存地址的值等于预期值时,将该内存地址的值写为新的值
Java 中的 CAS 通过 Unsafe
类实现
CAS 缺陷:
循环时间过长:如果 CAS 自旋一直不成功,会给 CPU 带来很大开销
只能针对一个共享变量
存在 ABA 问题:CAS 只检查了值有没有发生改变,如果原本值为 A,被改为 B 之后,又被改为了 A,那么 CAS 是不会发现值被改编过了的
ABA 问题解决方案:为每个变量绑定版本号,A-->B-->A 加上版本号为:A1-->B2-->A3
# Lock 锁与 AQS
AQS 是抽象队列同步器,是 JUC 中的核心基础组件,AQS 是一个 FIFO 的双向队列,队列中存储的是 thread,JUC 中大部分同步工具类都是基于 AQS 的
线程在获取锁失败之后,会被封装成 Node 节点加入到 AQS 阻塞等待,当获取锁的线程释放锁之后,会从 AQS 队列中唤醒一个线程,AQS 队列如下:
# JUC 中 AQS 结构
JUC 包中提供的锁有:
- ReentrantLock 重入锁
- ReentrantReadWriteLock 读写锁
- StampedLock 重入读写锁,JDK1.8 引入
而 AQS 也就是 AbstractQueuedSynchronized,是一个同步器,是 JUC 的基础工具类
接下来 ReentrantLock 加锁需要使用到 AQS,因此这里先对 AQS 整体框架做一个介绍, 具体的 AQS 一些方法操作 ,会在下边讲解 ReentrantLock 的时候介绍!
# AQS 数据结构
// AQS 队列头结点
private transient volatile Node head;
// AQS 队列阻塞尾节点
private transient volatile Node tail;
// 当前锁的状态,0:没有被占用,大于 0 代表有线程持有当前锁
// 当 state > 1 时,表锁被重入了,每次重入都加上 1
private volatile int state;
// 代表当前持有独占锁的线程
private transient Thread exclusiveOwnerThread;
# AQS 的 state 状态
AQS 的 state 状态代表锁是否被占用
如果 AQS 的 state 状态为 0 表示当前锁没有被占用
如果 AQS 的 state 状态 > 0 表示当前锁被占用
为什么 > 0 是被占用呢?
因为可能会发生锁的重入,每次重入会给 state + 1
线程通过 CAS 抢占锁
那么线程来抢占锁,就是通过 CAS 来更新 state 状态,由 0 更改为 1,才算抢锁成功
当没有抢到锁的线程,会被封装为 Node 节点进入 AQS 的队列等待,该节点是由前边一个节点来进行 唤醒
的
# AQS 中 Node 的数据结构
AQS 中的 Node 就是对线程的封装,等待锁的线程封装为 Node 进入队列排队,数据结构如下:
// 当前节点的等待状态
volatile int waitStatus;
// 前继指针
volatile Node prev;
// 后继指针
volatile Node next;
// 当前节点中的线程
volatile Thread thread;
// Condition Queue 中的内容,这里不介绍
Node nextWaiter;
waitStatus 的状态有以下几个,各自的含义不同:
/***********waitStatus 的取值定义***********/
// 表示此线程取消了争抢这个锁请求
static final int CANCELLED = 1;
// 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒
static final int SIGNAL = -1;
// 表示节点在等待队列中,节点线程等待唤醒
static final int CONDITION = -2;
// 当前线程处在SHARED情况下,该字段才会使用
static final int PROPAGATE = -3;
# AQS 的作用
上边说了 AQS 是 JUC 的基础工具类,ReentrantLock 就是基于 AQS 来写的
那么我们也可以基于 AQS 来实现一个同步工具,如下 Lock 来源为美团技术团队案例代码:
public class LeeLock {
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire (int arg) {
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease (int arg) {
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively () {
return getState() == 1;
}
}
private Sync sync = new Sync();
public void lock () {
sync.acquire(1);
}
public void unlock () {
sync.release(1);
}
}
同步工具使用:
public class LeeMain {
static int count = 0;
static LeeLock leeLock = new LeeLock();
public static void main (String[] args) throws InterruptedException {
Runnable runnable = new Runnable() {
@Override
public void run () {
try {
leeLock.lock();
for (int i = 0; i < 10000; i++) {
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
leeLock.unlock();
}
}
};
Thread thread1 = new Thread(runnable);
Thread thread2 = new Thread(runnable);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(count);
}
}
# 深入了解 ReentrantLock 底层原理细节
前言:
本来写这篇文章是因为自己对 ReentrantLock 的了解一直都比较表面,原理了解的还不是那么的清楚,并且对 AQS 也都是一知半解,但是在面试中这些都是比较常问的内容,如果原理了解比较清楚的话,对面试会有很大的帮助,因此自己在这里也总结了一份,本来以为很快就写完了,没想到竟然写了两天,文字的话就有 6000 +,ReentrantLock 中大部分的内容也都说到了,并且配有许多图片,如果感觉内容太多的话,可以对 ReentrantLock 中加锁以及解锁内容重点阅读,希望对你有所帮助!
这里我们来讲一下 ReentrantLock 底层加锁的原理
其中 ReentrantLock 底层加锁主要是依靠于 AQS(AbstractQueuedSynchronizer) 来做的,AQS 是 JUC 包下的基础工具类
从名字就可以看出来 AQS 是一个同步器,用于 管理多线程环境下获取锁
的问题,接下来会介绍 ReentrantLock 底层原理 以及 AQS 细节!
# ReentrantLock 构造方法
看 ReentrantLock 的底层原理的话,从它的加锁方法入手:
public class ReentrantLockDemo {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
lock.lock();
}
}
先来简单看一下 ReentrantLock 的 构造方法
:
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
有两个构造方法:
- 默认的无参构造方法的话,将
sync
变量声明为了非公平锁 - 有参构造方法传入了
boolean
变量,如果是 True,则使用公平锁;如果是 False,则使用非公平锁
sync 变量是什么?
sync 变量是 Sync 其实就是在 ReentrantLock 中声明的静态类,继承自 AbstractQueuedSynchronizer
也就是 AQS
而在 ReentrantLock 的构造方法中声明的 FairSync
和 NonfairSync
也都是继承自 Sync 类
ReentrantLock 的 lock 上锁方法最终就是走到了这两个类中,关系图如下:
# ReentrantLock 非公平锁加锁原理
上边说完了 ReentrantLock 构造方法,接下来看一下加锁的流程是怎样的
先进入到了 ReentrantLock # lock() 方法:
// ReentrantLock
public void lock() {
sync.lock();
}
sync 是什么在上边我们已经说过了,直接跟随源码向下走,这里先说一下 非公平锁
的加锁流程(走到了 NonfairSync 类中):
// ReentrantLock # Sycn # NonfairSync # lock()
final void lock() {
// CAS 操作抢锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
可以看到,在 lock() 方法中,主要有两个 if 分支:
compareAndSetState(0, 1)
成功:这个 CAS 操作用来修改 AQS 队列中的 state 变量,从 0 修改为 1,表示当前线程要加锁了,CAS 成功之后,通过setExclusiveOwnerThread
将当前线程设置到 AQS 中去compareAndSetState(0, 1)
失败:说明加锁失败,直接调用 acquire(1) 操作(之后会讲这个操作)
那么可以看到,如果 CAS 操作抢到锁之后,接下来就拿到锁了,可以执行同步代码块中的操作了
那么如果 CAS 操作没有拿到锁
的话,会进入到 acquire 方法中
- 接下来看一下 acquire() 方法
上边说了,CAS 失败,说明当前线程没有抢到锁,也就是当前线程来 CAS 的时候,发现 state 本来的值不是 0,也就是说锁已经被持有了,那么就有两种情况了:
1、锁的重入 :发现是当前线程持有的锁,因此直接重入
2、当前线程进入队列等待 :发现是其他线程持有锁,因此进入队列等待获取锁
在 acquire 方法中,主要就做上边两件事情,代码如下:
// AbstractQueuedSynchronized
// 方法使用 final 定义,无法重写
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
那么在 acquire 中,有 4 个方法,接下来我们一个一个看这些方法是做什么的:
- 首先看 tryAcquire 方法
tryAcquire 方法在 FairSync 和 NonFairSync 中都有实现,这里我们先看 NonFairSync 的实现:
// NonFairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
可以看到,tryAcquire 最终调用了 nonfairTryAcquire()
,来看一下它的执行流程
首先通过 getState() 取出 AQS 队列的 state 状态值
如果 state == 0,说明锁没有被占用,于是通过 CAS 抢锁,抢到之后将持有锁线程设置为自己,是不是很熟悉呢?
在进入 nonfairTryAcquire()
之前其实就已经通过 CAS 抢锁失败了,但是这里再抢一次,万一其他线程已经释放了呢?
那么如果 current == getExclusiveOwnerThread() 的话,表明 当前线程和持有锁的线程是同一个
,那么直接重入就可以了,重入的次数在 AQS 的 state 值记录,可以看到,将 state + 1 即可
如果 CAS 没抢到,并且不是重入的话,那就返回 false,这一次的 tryAcquire() 就算失败了,进入接下来的流程
- tryAcquire 失败之后,当前线程进入等待队列
// AbstractQueuedSynchronized
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
为了更方便大家观看,将 acquire 的代码重新贴一下,上边在 tryAcquire 中尝试加锁失败之后,接下来会执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
操作,这里我们看一下这两个方法
首先看一下 addWaiter() 这个方法:
// AbstractQueuedSynchronized
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
在这个方法中,先将当前 Thread 给包装成了 Node 节点,目的就是向 AQS 中的队列中存放
判断 if (pred != null)
,说明 AQS 队列里的已经有节点了,也就是 AQS 队列已经被初始化过了,直接将当前 Node 加入到 AQS 队列去即可,分为了 3 步:
1、声明 pred 变量为 tail,让 node 的前继指针指向 pred
2、通过 CAS 将 AQS 中的 tail 指针指向 node(新加入的 node 节点作为 tail 存在)
3、CAS 成功后,让 pred 的后继指针指向 node
经过这样一通操作,我们刚入队的这个 node 节点就成为了 tail 节点加入到了 AQS 队列中,如下图:
如果 pred == null
,也就说明 AQS 的队列中,tail 指针没有指向元素, 也就是表明了 AQS 队列此时还没有初始化 ,接下来就通过 enq(node)
将 AQS 初始化并把这个 node 插入进去:
// AbstractQueuedSynchronized
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 没有初始化
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在这个方法中,首先判断 if (t == null)
说明 AQS 中的队列都还没有初始化,因此这里初始化一下队列,初始化操作为 :创建一个节点,让 AQS 的 head 和 tail 两个指针都指向该节点
由于在 for 里是不停循环的,因此在初始化队列之后,就发现 t != null
了,因此就可以将 node 节点插入到 AQS 的队列后边去:
好,那么到这里,当前线程节点进入 AQS 的等待队列的操作就完成了,我们继续往下看
- 那么当前线程节点已经进入等待队列了,那么看一下接下来还会进行哪些操作?
// AbstractQueuedSynchronized
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里还是为了大家方便看,将 acquire 的代码继续贴出来
上边将 addWaiter() 方法讲完了,node 节点此时已经进入到了 AQS 的等待队列中,那么接下来肯定要将线程给挂起了!
这里我们继续看一下 acquireQueued()
方法:
// AbstractQueuedSynchronized
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在该方法中,先来看 for 循环,先取当前节点的前驱节点,如果前驱节点是头节点的话,那么就说明当前的线程节点已经是 AQS 队列中的第一个节点了,那么他就可以尝试去加锁了,因此这里再通过 tryAcquire 再抢一下锁试试(你可能已经忘了 tryAcquire 是做什么的了,其实就是 CAS 抢锁或者判断是否是重入锁)
如果 tryAcquire 抢到锁了之后,当前节点就没必要在 AQS 的队列中等待获取锁了,因此就将当前 node 节点从队列移除
这里删除当前 node 节点是通过 setHead() 方法删除了,其实就是让 AQS 队列的 Head 指针指向当前 node 节点,让当前的 node 节点作为虚拟头节点,再将原来的虚拟头节点的后继指针设置为空,因此这里注释中的 help GC
其实指的是帮助原来虚拟头节点的 GC 回收 ,如下图:
上边说的是当前节点是 AQS 队列中第一个节点的情况,那么如果当前节点不是 AQS 队列中的第一个节点的话,就要走下边的这个逻辑了:
// AbstractQueuedSynchronized
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
首先要通过 shouldParkAfterFailedAcquire()
方法判断当前 node 节点抢锁失败之后,应不应该被挂起?
这里为什么要这样判断呢?
因为如果上一个线程拿到锁了之后,在释放锁时,要对当前线程进行唤醒操作,如果上一个线程的状态异常,无法对当前线程唤醒,那你直接把当前线程挂起岂不是要出问题了,直接看代码:
// AbstractQueuedSynchronized
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
判断上一个节点异常就是通过 waitStatus 来判断的,首先拿到当前节点的前继节点的 waitStatus,声明为 ws 变量,waitStatus 的取值都在 Node 类中定义了:
// AbstractQueuedSynchronized # Node
/***********waitStatus 的取值定义***********/
// 表示此线程取消了争抢这个锁请求
static final int CANCELLED = 1;
// 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒
static final int SIGNAL = -1;
// 表示节点在等待队列中,节点线程等待唤醒
static final int CONDITION = -2;
// 当前线程处在SHARED情况下,该字段才会使用
static final int PROPAGATE = -3;
如果 ws == Node.SIGNAL
,说明前继节点可以对当前节点进行唤醒,直接返回 true 就好了(官方对 Node.SIGNAL 的描述是,其表示当前node的后继节点对应的线程需要被唤醒)
如果 ws > 0
,那就说明前边的这个线程取消了争抢这个锁的请求,那么前边的节点肯定就无法对当前节点唤醒了,因此把前边状态异常的节点直接跳过就好了,找到一个状态为 Node.SIGNAL
的节点即可,如下图:
如果 ws != Node.SIGNAL && ws <= 0
, 这说明了前边节点的状态不是 Node.SIGNAL,因此要将前边节点的状态设置为 SIGNAL
,表示前边的节点需要对它的后继节点进行唤醒操作!
// AbstractQueuedSynchronized
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
这里将代码重复贴一下,上边通过 shouldParkAfterFailedAcquire 判断了当前线程节点是否可以被挂起
如果可以被挂起的话,就调用 parkAndCheckInterrupt()
进行挂起:
// AbstractQueuedSynchronized
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
代码很简单,就是通过 LockSupport.park()
对当前线程挂起,这里注意在线程挂起之后 ,就没有执行接下来的 Thread.interrupted() 方法了,这个方法就是返回线程的中断状态(这里先讲一下每个方法的作用,具体的 线程唤醒流程
可以往后看)
// AbstractQueuedSynchronized
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如上,到此为止在 acquire() 方法中的 if 条件就已经讲完了,这个 if 满足的话,就说明当前线程目前已经被挂起了,之后进入到 selfInterrupt()
方法
其实在上边 parkAndCheckInterrupt()
方法中,挂起当前线程之后还检测了当前线程的中断状态并返回,但是这个方法会清除掉线程的中断状态:
Thread.interrupted();
因此,在 acquire() 的 if 判断都通过之后,需要调用 selfInterrupt()
再将中断标志重新设置给当前线程,如下:
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
上边这两个 interrupt 方法的作用不同:
interrupt: 是给线程设置中断标志
interrupted: 是检测中断并清除中断状态
加锁流程总结
到此为止,ReentrantLock 整个加锁的流程就已经说完了,上边的流程还是比较长的,因此这里再简化一下
1、首先,加锁无非就是公平锁和非公平锁,最总走到 FairSync 或者 UnFairSync 中的加锁方法
2、这里以非公平锁为例,首先就是 CAS 抢锁(通过 CAS 设置 AQS 的 state 值)
3、如果当前抢到锁,那就将 AQS 的持有锁线程设置为当前线程
4、如果没有抢到锁,就需要将当前线程包装成 Node 节点进入 AQS 队列中排队了
5、不过在排队之前,又尝试了 CAS 抢锁,并且判断了持有锁的线程是否是当前线程,实现了可重入的逻辑
6、如果还没抢到,当前线程的 Node 节点就进入到 AQS 排队了
7、那么由于当前线程进入队列中是需要挂起的,因此需要前边的节点对当前线程节点进行唤醒,因此需要保证当前线程前的节点可以唤醒当前线程,也就是判断前边的节点状态是否为 SIGNAL,将状态异常的节点直接跳过即可
8、那么保证了前边节点可以对当前线程唤醒之后,就可以将当前线程给挂起了,通过 LockSupport.park(接下来的流程,等待线程被唤醒之后,会继续执行)
看完加锁流程,应该对大部分的代码就已经比较熟悉了,那么加锁的流程已经讲过了,大家可以自己看一下 解锁
的过程,看看是否可以看懂,接下来会说一下解锁的流程!
# ReentrantLock 公平锁和非公平锁加锁的区别
上边说完了非公平锁的主干流程,将主干流程看完之后,其他的一些代码看起来应该难度不大
这里说一下非公平锁和公平锁的区别,其实只在 tryAcquire
方法中有不同的地方,这里只将不同的地方列了出来:
/*---------------------公平锁的 tryAcquire()--------------------------*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors()/*公平锁于非公平锁不同的地方*/ &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
// ...
}
/*---------------------非公平锁的 tryAcquire()--------------------------*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// ...
}
可以发现,在公平锁的 tryAcquire 方法上,只比非公平锁多了一个 !hasQueuedPredecessors()
方法
该方法是判断当前节点在队列中是否还有前继节点,如果有就返回 true;如果没有就返回 false
在公平锁中,要想 !hasQueuedPredecessors() == true
,必须 hasQueuedPredecessors()
返回 false,也就是当前节点在队列中没有前继节点,那么才可以通过 CAS 去抢锁,以此来保证公平!
# Node.CANCELLED 状态节点的产生
上边在讲加锁的流程,其实还有一个地方没有讲到,也就是产生 Node.CANCELLED 节点的操作没有说到,在 acquireQueued
方法中,如下:
// AbstractQueuedSynchronized
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// AbstractQueuedSynchronized
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在 acquireQueued 方法中的 finally 代码块
,如果判断当前节点没有成功获取到锁,会调用 cancelAcquire
方法标记当前节点状态为 CANCELLED:
private void cancelAcquire(Node node) {
// 过滤掉无效的节点
if (node == null)
return;
// 将节点上的线程清空
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
// 分支1:当前节点是 tail 节点
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 分支2:当前节点不是 head 指针的后驱节点,并且((前边节点的状态为 SIGNAL) || (前边节点的状态 <= 0 && 可以成功设置为 SIGNAL))
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 分支3:当前节点是 head 指针的后驱节点或者不满足分支 2 后边的条件
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
接下来我们说一下这个方法中的流程:
首先先拿到当前节点的前驱节点,声明为 pred
,拿到后驱节点声明为 predNext
,并且将当前节点的 waitStatus 设置为 Node.CANCELLED
接下来就是 if 条件判断了,其实是有 3 个分支:
先来看分支 1 :如果当前节点是 tail 节点,那么要对当前节点进行取消的话,直接让 tail 指针指向前边的节点就可以了,并且让 pred
节点的 next 指针设置为空
再来看分支 2: 当前节点不是 head 指针的后驱节点,并且((前边节点的状态为 SIGNAL) || (前边节点的状态 <= 0 && 可以成功设置为 SIGNAL)),如果都成立,再判断前边节点的 thread 是否为空,不为空就进入该分支
那么此时通过 CAS 操作将前一个节点的 next 指针指向后一个节点(下图红线)
在哪里修改了 predNext 节点的 prev 指针了呢?
可以看到上边红线就是修改的地方,虽然将 pred 的 next 指针指向了 predNext,但是并没有将 predNext 指针指向 pred,那么在哪里将 predNext 的 prev 指针指向前边的正常节点呢?
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
// 修改 prev 指针,跳过 CALCELLED 状态的节点,指向正常节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
这里将 predNext 的 prev 指针修改放在了 shouldParkAfterFailedAcquire 方法中,如果 predNext 这个节点被唤醒,或者发生自旋都会执行 shouldParkAfterFailedAcquire 方法,在这里就会将自己的 prev 指针指向前边的正常节点,也就是 pred 节点,修改完之后,中间的 待取消节点
就被孤立起来了,之后会被 GC 掉
最后走到分支 3 : 如果当前节点是是 Head 的后驱节点或者是分支 2 后边的条件没有满足(也就是查看当前节点前边的节点中不存在 未取消状态的节点
)
那么此时就要去唤醒当前节点的后驱节点了
这里着重说一下这里为什么要去唤醒当前节点的后驱节点:
进入到这里,说明这个没有满足分支 2 的 if 语句,如下:
// 分支 2
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
1、首先 pred != head 和 pred.thraed == null 这两个条件如果都不满足,他的意思是前边就是头节点,并且当前节点都取消获取锁了,前边也没有线程拿锁,因此直接唤醒后边的线程节点去拿锁就好了
2、((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
如果这个条件 不满足
,说明前驱节点状态不是 SIGNAL,或者通过 CAS 将前驱节点状态设置为 SIGNAL 失败了
compareAndSetWaitStatus 设置为 SIGNAL 失败 是因为在高并发情况下,前驱节点突然释放锁了,导致去 CAS 时发现前驱节点状态发生改变,CAS 失败
那么在前驱节点突然释放锁了之后,会对当前节点进行唤醒,结果当前这个节点取消拿锁了,因此当前节点状态也不是 SIGNAL,无法对后边的节点唤醒,因此这里手动对后边节点唤醒
# ReentrantLock 的解锁操作
解锁操作同样也很重要,解锁后会对后边的线程进行唤醒操作
还是从 unlock() 方法调用入手:
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.unlock();
}
直接向下走到核心代码:
// ReentrantLock
public void unlock() {
sync.release(1);
}
// AbstractQueuedSynchronizer
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
可以看到在 release() 方法中,先通过 tryRelease() 尝试释放锁,如果释放成功后,就对后边线程进行唤醒,先来看 tryRelease
:
// ReentrantLock
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
这里的流程就很简单了,releases 变量就是释放锁的个数,为 1,主要是减少重入的次数
令 c = state - release ,如果 c 为 0,表示没有当前线程已经完全不使用这个锁了(所有的重入都已经退出),将持锁线程设置为 null 并且将 AQS 的 state 设置为 0,表示目前锁没有被线程占用
当 tryRelease 返回 true 就表示锁已经释放了,接下来对后边线程进行唤醒即可,执行 unparkSuccessor
进行唤醒:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
在这个唤醒的方法中,先使用 CAS 将当前节点的 waitStatus 改为 0,表示当前节点已经在对后边节点唤醒了
令 s = node.next
:
如果 s 是 null 或者它的 waitStatus > 0,也就是后边的节点是取消状态,因此要通过 for 循环,从后向前找到 node 节点后的第一个正常状态的节点,对这个正常状态节点进行唤醒
如果不满足上边的条件,说明当前节点后边的 s 节点状态正常,直接唤醒 s 节点就可以了
每一个线程节点都是在 acquireQueued
方法中挂起的,当被唤醒之后,就会在 acquiredQueued 中的 for 循环中自旋继续执行抢锁操作!
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 线程真正挂起的位置
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
// 挂起
LockSupport.park(this);
return Thread.interrupted();
}
到此,释放锁以及线程唤醒后在哪里继续开始执行的操作就说完了!
这里说一下在 unparkSuccessor 中为什么要 从后向前
找到 node 后的第一个正常节点呢?
与 addWaiter() 入队列这个方法有关:
// AbstractQueuedSynchronized
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
可以看到,将线程节点加入队列需要两个操作:node.prev = pred
、 pred.next = node
但是这两个操作不是原子操作,我们先执行了 node.prev = pred
,之后通过 CAS 操作设置 Tail 指针之后,才执行 pred.next = node
于是极端情况下可能存在这种情况:当前节点的下一个节点是 CANCELLED 状态,此时新入队的节点只执行了 node.prev = pred
,还没来得及执行 pred.next = node
因此,如果 for 循环从前到后遍历的话,可能遍历到 CANCELLED 节点时,此时 next 指针还没有赋值,可能出现空指针的问题
head -> 当前节点 node -> CANCELLED 节点 -> 新入队节点
# ReentrantLock 和 synchronized 区别
首先在 JDK1.8 中,两者 在性能上基本上没有区别
,因为在 JDK1.6 之后,synchronized 已经做了锁升级的优化,性能相比之前提升许多
那么在真正使用的时候,要如何进行选择呢,毕竟两者都是进行线程同步操作的
- 使用方式
首先,使用方式上
,使用 ReentrantLock 的话,需要声明 ReentrantLock 变量,并且手动调用 lock 和 unlock,要正确处理异常情况,需要确保 unlock 成功执行,否则可能会导致所有线程都无法取到锁
那么使用 synchronized 来说的话,就比较简单了,需要对什么加锁,就直接加锁就好了,我们不需要去关心锁的添加和释放,都交给了 JVM 底层来完成
因此,他们在使用上的区别就是 ReentrantLock 使用起来较为麻烦一些,而 synchronized 使用起来比较方便
- 灵活性
那么接下来说一下 灵活性
,灵活性和方便程度恰恰是相反的,synchronized 使用起来虽然方便,但是不够灵活,因为 synchronized 中的线程等待锁是无法唤醒的,而 ReentrantLock 中,等待锁的线程是可以响应中断的
并且 ReentrantLock 中还支持多种锁 公平锁、读写锁
,而 synchronized 只支持 重入非公平锁
因此在高并发场景使用的时候,很显然 ReentrantLock 功能更多,更加灵活,并且既然都是高并发场景了,大部分人对 ReentrantLock 的使用以及理解都是比较好的,因此建议使用 ReentrantLock
而在 上下文切换
这一方面,两个锁其实都是要阻塞和唤醒线程,因此都会发生线程的上下文切换,但是 ReentrantLock 通过 CAS 进行优化,在线程阻塞之前,会进行多次 CAS 抢锁操作,因此降低了线程上下文切换的概率 ,而 synchronized 只要升级为了重量锁,线程来拿锁时一定会进入队列中阻塞等待
在 synchronized 使用中还有一个方面体现出了不灵活性,就是它的线程获取不到锁会一直等待: 即没有 超时自动取消获取锁
的操作,也就是 tryLock(timeout)
功能,在指定时间获取不到锁的时候,可以直接将线程超时了,不去拿锁了
- 为什么说需要
tryLock(timeout)
这个功能呢?
假设这样一种场景,有一个任务在某个时间点可能多个线程同时要来执行,但是只要有一个线程执行完毕之后,其他线程就不需要执行了
那么假设在这个需要执行任务的时间点,大量线程同时过来执行,也就是大量线程都进入阻塞队列等待获取锁,第一个线程拿到锁执行任务之后,此时后边的线程都不需要执行该任务了,但是由于没有这个超时功能,导致后边的线程还需要在队列中等待获取锁,再一个个进入同步代码块,发现任务已经执行过了,不需要自己再执行了,之后再退出同步代码块
因此这个 tryLock(timeout)
的作用就是 将大量线程的串行操作转为并行操作 ,大量线程发现指定时间内获取不了锁了,直接超时,不获取锁了,这样后边的线程再来看就发现任务已经执行过了,不需要再去获取锁执行任务了
这里 tryLock(timeout)
的情况只是举一个特殊的情况,其实是参考了分布式环境下,更新 Redis 缓存时会出现这种情况,但是在分布式环境下肯定不会使用 synchronized ,因此这里主要是举个例子说一下 tryLock 的作用!
# Java线程池的核心内容详解
# 线程池的优势
首先,线程池是将多个线程进行池化操作,统一进行管理,这样做有什么好处呢?
降低创建、销毁线程的开销
:线程池中维护固定数量的线程,不需要临时进行线程的创建和销毁提高响应速度
:对于新提交到线程池中的任务,直接使用线程池中的空闲线程可以直接进行处理,不需要等待创建线程节省资源
:可以重复利用线程
# 什么场景下要用到线程池呢?
一般就是多 IO 的场景下需要用到,像 IO 任务很多,比如数据库操作、请求其他接口操作,这都属于 IO 类任务,IO 类任务的特点就是只需要线程去启动一下 IO 任务,之后就等待 IO 结果返回即可,IO 结果返回的时间是比较慢的 ,因此如果只使用单线程去执行 IO 任务的话,由于这个等待时间比较长,那么线程需要一直等待 IO 结果返回,而无法执行其他操作
因此在多 IO 场景下,可以使用线程池来加快 IO 任务的执行,开启多个线程同时去启动多个 IO 任务,可以加快 IO 任务的处理速度
# 线程池中重要的参数【掌握】
线程池中重要的参数如下:
corePoolSize
:核心线程数量maximumPoolSize
:线程池最大线程数量 = 核心线程数+非核心线程数keepAliveTime
:非核心线程存活时间unit
:空闲线程存活时间单位(keepAliveTime单位)workQueue
:工作队列(任务队列),存放等待执行的任务- LinkedBlockingQueue:无界的阻塞队列,最大长度为 Integer.MAX_VALUE
- ArrayBlockingQueue:基于数组的有界阻塞队列,按FIFO排序
- SynchronousQueue:同步队列,不存储元素,对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务
- PriorityBlockingQueue:具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
threadFactory
:线程工厂,创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等。handler
: 拒绝策略 ,有4种- AbortPolicy :直接抛出异常,默认策略
- CallerRunsPolicy:用调用者所在的线程来执行任务
- DiscardOldestPolicy:丢弃阻塞队列里最老的任务,也就是队列里靠前的任务
- DiscardPolicy :当前任务直接丢弃
# 新加入一个任务,线程池如何进行处理呢?【掌握】
新加入一个任务,线程池处理流程如下:
- 如果核心线程数量未达到,创建核心线程执行
- 如果当前运行线程数量已经达到核心线程数量,查看任务队列是否已满
- 如果任务队列未满,将任务放到任务队列
- 如果任务队列已满,看最大线程数是否达到,如果未达到,就新建非核心线程处理
- 如果当前运行线程数量未达到最大线程数,则创建非核心线程执行
- 如果当前运行线程数量达到最大线程数,根据拒绝策略处理
# 如何将任务提交到线程池中呢?
有两种方式:execute
和 submit
这两种方式的区别:
- execute
- execute 没有返回值
- execute 无法捕获任务过程中的异常
- submit
- submit 会返回一个
Future
对象,用来获取任务的执行结果 - submit 可以通过 Future 对象来捕获任务中的异常
- submit 会返回一个
execute 方式如下:
ExecutorService executor = Executors.newFixedThreadPool(5);
executor.execute(new Runnable() {
public void run() {
// 执行具体的任务逻辑
System.out.println("Task executed using execute method");
}
});
executor.shutdown();
submit 方式如下:
ExecutorService executor = Executors.newFixedThreadPool(5);
Future<String> future = executor.submit(new Callable<String>() {
public String call() {
// 执行具体的任务逻辑
return "Task executed using submit method";
}
});
try {
String result = future.get(); // 获取任务执行结果
System.out.println(result);
} catch (InterruptedException e) {
// 处理中断异常
} catch (ExecutionException e) {
// 处理任务执行异常
} finally {
// 关闭线程池
executor.shutdown();
}
# 线程池是如何关闭的呢?
通过调用线程池的 shutdown()
方法即可关闭线程池
调用之后,会设置一个标志位表示当前线程池已经关闭,会禁止向线程池中提交新的任务
去中断所有的空闲线程并且等待正在执行的任务执行完毕(通过调用线程 interrupt()
方法),当线程池中所有任务都执行完毕之后,线程池就会被完全关闭
扩展:thread.interrupt() 方法调用后线程会立即中断吗?
不会,调用 interrupt 只是将被中断线程的中断状态设置为 true,通知被中断的线程自己处理中断,而不是立即强制的让线程直接中断(强制中断不安全)
当外部调用线程进行中断的命令时,如果该线程处于被阻塞的状态,如 Thread.sleep(),Object.wait(),BlockingQueue#put,BlockingQueue#take 等等时,那么此时调用该线程的 interrupt 方法就会抛出 InterruptedException 异常
因此,可以通过这个特点来优雅的停止线程(在 《Java多线程核心技术》 一书中说到):将 sleep() 和 interrupt() 搭配使用,来停止线程
# 线程池为什么设计为任务队列满了才创建新线程?
这里说一下在知乎上看到的一个问题,个人觉得提问的比较好
线程池为什么设计为队列满+核心线程数满了才创建新线程?而不是队列积压一定阈值的时候创建新的线程?
当队列积压满了之后,创建非核心线程来执行任务只是一个 兜底措施
你想如果我们自己去设计一个线程池,是不是只需要一个参数来管理线程池中的线程数量就可以了,完全没必要去创建这些非核心线程执行任务
那么线程池的设计团队可不会考虑的这么简单,它们不仅会考虑性能方面,更是会保证比较高的 可用性
因为在 Java 应用中,高并发 和 高可用 这两块都是比较重要的东西,不仅要性能好,还要不崩溃
就比如之前滴滴故障、阿里云故障、语雀故障所带来的影响都是比较大的,对公司来讲整个可信度有所下降,对于我们个人来讲,可能有些人恰巧需要紧急使用,但是由于发生故障,不得已计划延期
所以线程池为了保证 高可用 就设计了任务队列,以及在队列满了之后再去创建非核心线程处理溢出来的任务
当然任何设计都是平衡之后的选择,如果你在公司项目需求与设计者的理念不符合,可以基于原有设计做出封装,来进行定制化操作!
# 线程池中线程异常后,该线程会销毁吗?
向线程池中提交任务有 execute()
和 submit()
,两种提交方式的区别如下:
execute 执行任务:execute 没有返回值,无法捕获任务过程中的异常
submit 执行任务:submit 会返回一个
Future
对象,用来获取任务的执行结果,可以通过 Future 对象来捕获任务中的异常
那么执行过程中发生异常,线程会销毁吗?
execute 无法捕捉任务过程中的异常是因为当任务在执行时遇到异常的话,如果异常在线程执行过程中没有被捕获的话,该异常就会导致线程停止执行,并且在控制台打印异常,之后该线程会终止,线程池会创建一个新线程来替换他
submit 方式执行任务的话,当执行过程中发生异常,异常会被封装在 submit()
返回的 Future
对象中,当调用 Future.get()
时,可以捕获到 ExecutionException
异常,因此使用 submit()
发生异常不会终止线程
参考:线程池中线程异常后:销毁还是复用? (opens new window)
# 关于线程池在生产环境中的使用
这里整理了一些线程池在生产环境中使用的建议来帮助我们更好的在项目中使用线程池
# 一个项目使用一个线程池还是多个线程池?
一般建议是不同的业务使用不同的线程池,从而避免非核心业务对于核心业务的影响
如果所有的业务使用同一个线程池,非核心业务可能执行速度很慢,从而占用了很多线程迟迟不归还,导致核心业务在任务队列中等待,拿不到线程执行
并且还可能造成 死锁问题
,当父子任务使用同一个线程池时,父任务如果将核心线程全部占用之后,等待子任务完成,由于核心线程没有空闲的,导致子任务进入到任务队列中等待线程资源,导致父子任务之间互相等待
# 线程池在 RocketMQ 中的使用
在 MQ 中使用了很多线程池,这里说一下在发送消息时使用的线程池:
1、任务队列:创建了 异步发送者线程池
,任务队列
使用长度为 50000 的阻塞队列
2、线程数:核心线程数
和 最大线程数
相同,为 CPU 核数
3、存活时间:非核心线程存活时间
60s
4、线程名称:重写了线程工厂,主要是 为了线程的命名规范
,这样在查询日志时,只要做好业务之间的隔离,就可以很容易的根据线程名称来定位到对应的业务,便于分析线上问题
private final ExecutorService defaultAsyncSenderExecutor;
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
那么我们在自己的项目中使用的线程池就可以参考 MQ 中的用法,更加规范的使用线程池
至于为什么要这样设置核心线程数,一方面是参考了设置核心线程数的经验(CPU 密集型的任务令线程数等于 CPU 核心数,减少了线程之间的上下文切换,速度比较快),另一方面 RocketMQ 肯定内部经过性能测试,发现这样设置性能比较好一些
# 关于线程数量的设置
在项目中,一般使用线程池的场景无非就两种:
及时性任务
:需要迅速完成,降低用户等待时间非及时性任务
:批量完成任务,一般是后台任务
那么对于 及时性任务
来说,需要尽可能快的完成任务,因此要 尽可能增大可执行任务的线程数量
,来尽可能快的完成任务,不要设置任务队列
,因为只有任务队列满了之后,才会去创建非核心线程执行
对于 非及时性任务
来说,这类任务并不面向用户,特征是任务量很大,需要批量处理,不需要很低的延迟,因此需要设置合适线程数量, 利用有限的资源去尽可能快的执行任务
,并且设置任务队列去缓冲任务,但是尽量不要使用无界的任务队列,无界队列任务堆积过多会造成 OOM
- 这里举一个线程池在高并发电商系统中的使用案例
这里我举一个使用线程池的真实生产环境的案例:用户消息推送
对于中大型电商系统来说,用户量一般最少都达到了千万级,那么如果举办促销活动或者优惠活动了,电商系统肯定需要给用户发送通知,可能会有多个渠道发送比如短信、邮箱等等,那么肯定是需要调用第三方平台的 API 了
调用其他平台 API,毫无疑问就会产生网络 IO,并且是 千万级别的网络 IO ,如果只靠单线程去执行,那可能等推送完之后,促销活动也已经结束了
因此,对于这种 IO 任务,并且是大体量推送的 IO 任务,就必须引入线程池来优化性能了,通过多线程来进行任务的推送(当然这里还使用了 RocketMQ 来进行解耦,引入 MQ 之后,就是使用线程池来生成大量消息推送到 MQ 中,消费者再去订阅这些消息去调用第三方平台进行推送,由于该文章主要是讲线程池的,所以这里 MQ 的部分就简单说一下)
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
0,
permits * 2,
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
NamedDaemonThreadFactory.getInstance(name)
);
这里也将线程池创建的代码给列出来,这里顺带说一下线程池核心线程的参数为什么设置为 0:
因为在消息推送这块,并不是一直要推送的,促销活动、发优惠券,在正常情况下是不会推送发送消息的,因此将核心线程数设置为 0 可以在没有推送任务的时候,将线程池中的线程都回收掉,有任务的时候,再来创建非核心线程执行任务,这样可以避免线程在没有任务时空闲,占用资源
这里注意任务队列的选用
将核心线程数设置为 0 之后,队列使用了 SynchronousQueue
,因为这个队列是不存储元素的,因此有任务来了就会创建非核心线程去执行
如果将设置了有容量的任务队列,任务进来之后会先放在队列中,并不会创建非核心线程!
# 美团技术团队针对线程池所做的优化
在美团内部有多次因为线程池参数设置不合理而引发故障的案例
因此可以发现在不同场景下,开发人员对参数的配置有一个大概的方向,但是具体配置多少还没有一个通用的公式 ,导致上线之后,线程池会因为 线程数设置过少
或者 任务队列设置不合理
而出现故障
因此美团技术团队设计了 动态化线程池
,提供了对 线程池的监控
以及参数动态调整,这样在调整参数之后,通过监控可以看到整个线程池的负载情况,可以选出比较合适的参数方案
那么这里重点的优化提升就在于两点:
- 线程池参数的动态化设置
- 线程池监控
这里提一下在线程监控中,对线程池负载的定义
线程池的负载可以根据活跃的线程数和最大线程数的比值来反映
线程池活跃度 = activeCount/maximumPoolSize
,当活跃度升高,代表着线程池负载在逐步上升
还可以 从任务队列中等待的任务数量
或者 发生拒绝策略的次数
来反映
- 总结一下
线程池参数的设置没有一个通用的公式,要根据实际场景出发,在设置之后,可以对线程池的性能进行测试,像对线程池进行性能测试的话,就需要对线程池做监控,来看在不同参数下线程池处理任务时的负载表现,来设置更加合理的参数
# 自定义拒绝策略
在线程池中可以 自己去定义拒绝策略
,如果线程池无法处理更多的任务了,可以在自定义的拒绝策略中,将拒绝的任务 异步持久化
到磁盘中去,之后再通过一个后台线程去定时扫描这些被拒绝的任务,慢慢执行
保证严格的任务不丢失:如果线上机器突然宕机,线程池的阻塞队列中的请求怎么办?
如果宕机,重启之后,线程池阻塞队列中的任务就会全部丢失
如果想要解决这种情况的话,有这么一个 解决方案
:在将任务提交到线程池中去的时候,先把任务在数据库中存储一份,并记录任务执行的状态:未提交、已提交、已完成,执行完之后的话,将任务状态标记为 已完成,如果宕机后,导致任务丢失,就可以去数据库中扫描任务,重新提交给线程池执行
# 阿里手册中的线程池规范
在使用线程池的时候,需要注意一些规范,以免出现不必要的问题,可以参考阿里巴巴 Java 开发手册,如下:
线程池名称命名规范:
线程池创建规范:
# 京东并行框架asyncTool如何针对高并发场景进行优化?
由于最近在整理并发相关的内容,整理了 CompletableFuture、CAS、线程池这些方面的内容,但是通过理论知识我们只是学会了:怎么去用?应该怎么去用?
但是并没有学习别人如何去用,没有实际场景的示范,恰巧看到了 tianyaleixiaowu 作者开源出来的 asyncTool 并行框架 ,并且已经在 京东App后台接受苛刻、高并发、海量用户等复杂场景业务的检验测试
所以这篇文章就以这个并行框架为例,来说一下如何在高并发场景中保证比较好的性能,即如何通过 CompletableFuture、CAS、线程池去提升性能表现!
# asyncTool 介绍及使用
首先介绍一下这个框架的作用,主要是用来进行一些并行任务的编排的,以及任务执行时的一些监控和回调
那么你可能会想了,不是有 CompletableFuture 来做任务编排呢?为什么还需要这个框架?
这个作者也说了,CompletableFuture 虽然提供了任务编排的能力,但是尚有不足,比如我们有多个任务,并对他们编排,但是我们想要 了解每个任务在开始执行以及执行结束的情况 ,对这些情况进行监控,那么在这种情况下 CompletableFuture 就无能为力了!
这里我们举一个简单的使用例子,有 3 个任务,需要执行完 task1 之后再执行 task2,执行完 task2 之后再执行 task3,流程如下:
接下来定义任务,需要实现 IWorker、ICallback
两个接口,主要是定义其中的回调以及任务执行方法,这里可以不用具体了解,毕竟我们主要是看它是如何使用线程池的,只需要知道这个 MyTask1
是我们需要执行的任务即可
接下来我们定义测试类,这里使用了 3 个任务,只需要将上边定义的 MyTask1
再复制两份即可
在这个测试类中,创建了 3 个任务实例,并且定义了 3 个 WorkerWrapper
包装类,这个 Wrapper
主要对要执行的任务进行 包装、编排 ,比如我们定义了 workerWrapper1
并通过 next
方法指定下一个执行的任务是 workerWrapper2
,通过 next
进行任务的编排
最后通过 Async.beginWork()
来提交任务即可,接下来核心就看 asyncTool
是如何执行任务的
# CompletableFuture 和线程池配合使用
上边说如何使用,主要是为了找到任务开始执行的入口,从 入口 开始,看框架对于任务的处理:
从入口进入,最后走到下边这个 核心方法 中:
在这个方法中,可以看到是定义了一个 CompletableFuture 数组 ,来存储任务的异步执行结果
之后将我们定义的任务都扔到 线程池 中来执行,来将任务进行异步执行,提升任务执行速度,最后通过通过 CompletableFuture.allOf().get()
来阻塞等待所有任务执行完毕,最后返回即可
可以看到,在执行一些耗时操作中,异步化基本上都是必备的操作,也就是通过 CompletableFuture 和 线程池 来搭配使用,将任务的耗时操作异步化出去,尽量不影响主干流程
# 线程池的定义
上边说到了 asyncTool 中将 CompletableFuture 和 线程池 搭配使用,线程池具体如何定义的呢,这里其实使用了 newCachedThreadPool
线程池,具体的参数定义如下:
可以发现该线程池中并没有设置 核心线程 ,并且 线程的存活时间设置为 60s ,任务队列使用了 SynchronousQueue 任务队列,为什么要使用这个线程池呢?
先从场景来看,这个 asyncTool 框架主要是对提交的并行任务进行编排执行的,但是该框架其实并不知道任务什么时候会去提交,以及任务的数量大小的
所以在 asyncTool 中对于默认线程池的线程数量的设置就没有一个合适的值,如果设置的少了,可能任务提交多了之后, 导致任务堆积 OOM ;如果设置的多了,很多线程就会一直空闲,比较浪费线程资源
因此呢,就不设置核心线程,将最大线程数设置为 Integer.MAX_VALUE ,并且要与任务队列 SynchronousQueue 搭配使用
因为线程池的工作流程其实是先来判断有没有核心线程,没有核心线程的话,会将任务提到任务队列中阻塞等待,而 SynchronousQueue 这个任务队列是没有容量的,只做任务传递的作用,因此任务不会阻塞在队列中,直接会创建非核心线程执行任务(如果使用了 有容量的任务队列 ,就会出现问题了,当任务提交到任务队列之后,此时没有核心线程,任务会一直在任务队列中阻塞,得不到执行)
- 线程池参数这样设置的好处
当没有任务的时候,线程池中不需要再维护核心线程的存活,可以节约线程资源
当有任务提交的时候,线程池会根据任务的数量来创建对应的线程数量执行任务,这样就不会因为线程数设置过多或者过少而出现一些问题了
# CAS 的使用
asyncTool 框架的目的就是编排并行任务的执行,那么既然是并行任务肯定要保证多线程环境下任务不会重复执行这些情况出现
在 asyncTool 中并没有使用 synchronized 以及 ReentrantLock 这些比较重量级的锁,而是使用 CAS 来保证任务不会重复执行
这里看一下在这个框架中,任务真正被执行时的代码,如何来保证任务不被重复执行的(为了重点代码清晰,省略了一部分非核心代码):
可以看到,在真正执行任务之前,会先通过 CAS 来判断任务的状态是否是 INIT ,是的话,表示任务还没有被执行;如果不是的话,说明已经被执行过了,或者任务出现了异常,这里直接返回结果就好了
如果 CAS 操作失败,说明任务的状态并不是 INIT ,任务已经开始执行了,所以这里就不要重复执行了
并且在执行完任务之后,再次通过 CAS 操作判断任务状态,如果已经不是 WORKING ,说明其他线程已经执行完该任务了或者其他线程在执行时发现任务出现异常,因此这里就直接返回了,避免在后边重复的进行任务回调操作
# 为什么说 CAS 操作比较轻量呢?
这个如果了解 synchronized 的锁升级流程的话,应该就知道为什么
在 synchronized 锁升级的过程中,轻量级锁就是通过 CAS 来获取锁的,而重量级锁是通过 线程的阻塞、唤醒 进行线程之间的获取锁操作
那么 CAS 操作性能就高在了它只需要去内存中进行值的比对,发现内存的值和期望值不同,就直接返回操作失败就可以了
如果在 asyncTool 中使用 synchronized 来保证线程之间的同步的话,这还需要进行线程之间的阻塞、唤醒操作,因此会出现线程之间的上下文切换以及用户态和内核态之间的转换,导致性能开销比较大
所以在 asyncTool 不去对线程同步进行控制,而是通过 CAS 来避免一些因为重复操作可能会带来的问题,这样性能就比较高了
asyncTool 开源框架项目地址: https://gitee.com/jd-platform-opensource/asyncTool
如果需要测试代码的话,可以从我 fork 的仓库中拉取:https://gitee.com/qylaile/asyncTool
# CompletableFuture 原理与实践
# 1、CompletableFuture 使用
为什么要使用 CompletableFuture?
一个接口可能需要调用 N 个其他服务的接口,这在项目开发中还是挺常见的。举个例子:用户请求获取订单信息,可能需要调用用户信息、商品详情、物流信息、商品推荐等接口,最后再汇总数据统一返回。
如果是串行(按顺序依次执行每个任务)执行的话,接口的响应速度会非常慢。考虑到这些接口之间有大部分都是 无前后顺序关联 的,可以 并行执行 ,就比如说调用获取商品详情的时候,可以同时调用获取物流信息。通过并行执行多个任务的方式,接口的响应速度会得到大幅优化.
为什么不适用Future?
Future
对结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果,功能相比于 CompletableFuture 较少
因此,Java 8 才被引入的 CompletableFuture
可以帮助我们来做多个任务的编排,功能非常强大。
以下简称 CompletableFuture 为 CF
# CF 的创建
- 通过 new 关键字
- 基于静态工厂方法::
runAsync()
、supplyAsync()
。
使用 new 关键字创建
CompletableFuture<String> cf = new CompletableFuture<>();
如果已经知道结果,可以直接将结果复制给 CF
CompletableFuture<String> future = CompletableFuture.completedFuture("result value");
使用静态工厂方法创建
这里在创建任务的时候,推荐使用自定义的线程池,可以让我们清楚的知道任务运行在哪个线程之上,并且可以根据实际情况做线程池的隔离。
静态工厂方法有两个:runAsync():无返回值
、supplyAsync():有返回值
public void staticFactoryMethod() throws ExecutionException, InterruptedException {
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
System.out.println("runAsync创建,无返回值");
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync创建,有返回值");
return "result";
});
System.out.println(cf1.get());
System.out.println(cf2.get());
/**
* 输出结果:
* runAsync创建,无返回值
* supplyAsync创建,有返回值
* null
* result
*/
}
# CF处理计算结果
当我们获取到异步计算的结果之后,还可以对其进行进一步的处理,比较常用的方法有下面几个:
thenApply()
:thenAccept()
thenRun()
whenComplete()
// 沿用上一个任务的线程池
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
//使用默认的 ForkJoinPool 线程池(不推荐)
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(defaultExecutor(), fn);
}
// 使用自定义线程池(推荐)
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
thenApply()
public void thenApply() throws ExecutionException, InterruptedException {
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync创建,有返回值");
return "result";
});
CompletableFuture<String> cf2 = cf1.thenApply(res -> res + "调用thenApply");
System.out.println(cf2.get());
/**
* 调用结果:
* result调用thenApply
*/
}
thenAccept()
和thenRun()
:thenAccept
接收 Consumer 接口,thenRun
接收线程
CompletableFuture.completedFuture("hello!")
.thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenAccept(System.out::println);//hello!world!nice!
CompletableFuture.completedFuture("hello!")
.thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenRun(() -> System.out.println("hello!"));//hello!
whenComplete()
:接收2个输入对象进行消费
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello!")
.whenComplete((res, ex) -> {
// res 代表返回的结果
// ex 的类型为 Throwable ,代表抛出的异常
System.out.println(res);
// 这里没有抛出异常所有为 null
assertNull(ex);
});
assertEquals("hello!", future.get());
# 异常处理
- 使用
handle
处理异常
public void handleExe() throws Exception {
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("抛出异常!");
}
return "result";
}).handle((res, ex) -> {
System.out.println("res:" + res);
System.out.println("ex:" + ex);
return res;
});
System.out.println(cf1.get());
/**
* 输出结果:
* res:null
* ex:java.util.concurrent.CompletionException: java.lang.RuntimeException: 抛出异常!
* null
*/
}
- 使用
exceptionally
处理异常
由于异步执行的任务在其他线程上执行,而异常信息存储在线程栈中,因此当前线程除非阻塞等待返回结果,否则无法通过try\catch捕获异常。CompletableFuture提供了异常捕获回调exceptionally,相当于同步调用中的try\catch。使用方法如下所示:
@Autowired
private WmOrderAdditionInfoThriftService wmOrderAdditionInfoThriftService;//内部接口
public CompletableFuture<Integer> getCancelTypeAsync(long orderId) {
CompletableFuture<WmOrderOpRemarkResult> remarkResultFuture = wmOrderAdditionInfoThriftService.findOrderCancelledRemarkByOrderIdAsync(orderId);//业务方法,内部会发起异步rpc调用
return remarkResultFuture
.exceptionally(err -> {//通过exceptionally 捕获异常,打印日志并返回默认值
log.error("WmOrderRemarkService.getCancelTypeAsync Exception orderId={}", orderId, err);
return 0;
});
}
有一点需要注意,CompletableFuture在回调方法中对异常进行了包装。大部分异常会封装成CompletionException后抛出,真正的异常存储在cause属性中,因此如果调用链中经过了回调方法处理那么就需要用Throwable.getCause()方法提取真正的异常。但是,有些情况下会直接返回真正的异常(Stack Overflow的讨论** (opens new window)),最好使用工具类提取异常,如下代码所示:
@Autowired
private WmOrderAdditionInfoThriftService wmOrderAdditionInfoThriftService;//内部接口
public CompletableFuture<Integer> getCancelTypeAsync(long orderId) {
CompletableFuture<WmOrderOpRemarkResult> remarkResultFuture = wmOrderAdditionInfoThriftService.findOrderCancelledRemarkByOrderIdAsync(orderId);//业务方法,内部会发起异步rpc调用
return remarkResultFuture
.thenApply(result -> {//这里增加了一个回调方法thenApply,如果发生异常thenApply内部会通过new CompletionException(throwable) 对异常进行包装
//这里是一些业务操作
})
.exceptionally(err -> {//通过exceptionally 捕获异常,这里的err已经被thenApply包装过,因此需要通过Throwable.getCause()提取异常
log.error("WmOrderRemarkService.getCancelTypeAsync Exception orderId={}", orderId, ExceptionUtils.extractRealException(err));
return 0;
});
}
上面代码中用到了一个自定义的工具类ExceptionUtils,用于CompletableFuture的异常提取,在使用CompletableFuture做异步编程时,可以直接使用该工具类处理异常。实现代码如下:
public class ExceptionUtils {
public static Throwable extractRealException(Throwable throwable) {
//这里判断异常类型是否为CompletionException、ExecutionException,如果是则进行提取,否则直接返回。
if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
if (throwable.getCause() != null) {
return throwable.getCause();
}
}
return throwable;
}
}
# 编排任务
thenCompose()
按顺序链接两个CompletableFuture
对象,实现异步的任务链。它的作用是将前一个任务的返回结果作为下一个任务的输入参数,从而形成一个依赖关系。thenCombine()
会在两个任务都执行完成后,把两个任务的结果合并。两个任务是并行执行的,它们之间并没有先后依赖顺序。- 如果我们想要实现 task1 和 task2 中的任意一个任务执行完后就执行 task3 的话,可以使用
acceptEither()
。
public void thenCompose() throws ExecutionException, InterruptedException {
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "hello!")
.thenCompose(res -> CompletableFuture.supplyAsync(() -> res + "world!"));
System.out.println(cf1.get());
/**
* 输出结果:
* hello!world!
*/
}
public void thenCombine() throws ExecutionException, InterruptedException {
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "hello!")
.thenCombine(CompletableFuture.supplyAsync(()-> "world"), (res1, res2) -> res1 + res2);
System.out.println(cf1.get());
/**
* 输出结果:
* hello!world
*/
}
# 并行运行多个任务
allOf
可以并行运行多个任务,等到所有任务运行完再返回anyOf
任意一个任务执行完之后,即可返回
public void paraTask() throws ExecutionException, InterruptedException {
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "task1");
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "task2");
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> "task3");
CompletableFuture<String> cf4 = CompletableFuture.supplyAsync(() -> "task4");
CompletableFuture<String> cf5 = CompletableFuture.supplyAsync(() -> "task5");
CompletableFuture<String> cf6 = CompletableFuture.supplyAsync(() -> "task6");
CompletableFuture<Void> headerFuture = CompletableFuture.allOf(cf1, cf2, cf3, cf4, cf5,cf6);
headerFuture.join();
System.out.println(headerFuture.get());
System.out.println("完成");
}
# 2、CompletableFuture使用中常见问题
# 2.1 使用自定义线程池
如果在使用中,没有传入自定义线程池,将使用默认线程池 ForkJoinPool 中的共用线程池 CommonPool(CommonPool的大小是CPU核数-1,如果是IO密集的应用,线程数可能成为瓶颈)
如果执行两个任务时,传入了自定义的线程池,使用 thenRun 和 thenRunAsync 还有一点小区别;
- 当使用
thenRun
执行第二个任务时,将会使用和第一个任务相同的线程池 - 当使用
thenRunAsync
执行第二个任务时,那么第一个任务会使用自己传入的线程池,而第二个任务则会使用ForkJoin
线程池。(thenAccept、thenApply
同理)
在实际使用时,建议使用自定义的线程池,并且根据实际情况进行线程池隔离。避免核心业务与非核心业务竞争同一个池中的线程,减少不同业务之间相互干扰
# 2.2 线程池循环引用导致死锁
public Object doGet() {
ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
//do sth
return CompletableFuture.supplyAsync(() -> {
System.out.println("child");
return "child";
}, threadPool1).join();//子任务
}, threadPool1);
return cf1.join();
}
对于上边代码,如果同一时刻有 10 个请求到达,threadPool1
被打满,而 cf1
的 子任务也需要使用到 threadPool1
的线程,从而导致子任务无法执行,而且父任务依赖于子任务,也无法结束,导致死锁。
参考文章:
Java Guide:https://javaguide.cn/java/concurrent/completablefuture-intro.html#%E4%BD%BF%E7%94%A8%E8%87%AA%E5%AE%9A%E4%B9%89%E7%BA%BF%E7%A8%8B%E6%B1%A0
美团技术团队:https://tech.meituan.com/2022/05/12/principles-and-practices-of-completablefuture.html