Java-1 并发

背景

多线程并发编程是开发高并发系统的基础,利用好多线程机制可以大大提高系统整体的并发能力以及性能。

从底层来说,

单核时代:多线程主要为了提高单进程利用CPU和IO的效率。

​ 假设只运行了一个线程,请求IO的时候,如果只有一个线程,那么这个线程被IO阻塞则整个进程被阻塞。

​ 如果运行了多个线程,当一个线程被IO阻塞时,其他线程还可以继续使用CPU。

多核时代:多线程主要为了提高进程利用多核CPU的能力。

​ 如果只用一个线程,只会有一个CPU核心被利用到。

​ 创建多个线程,这些线程可以被映射到底层多个CPU上执行,显著提高任务执行的效率。

使用并发编程的代价:

  1. 不合理的使用无法提高程序运行速度
  2. 可能内存泄漏
  3. 可能死锁
  4. 线程不安全

Java线程模型

Thread & Runnable

使用 Thread 类

想要使用多线程,首先需要一个“线程类”, Java提供了Thread类和Runnable接口来实现多线程。

Thread: 线程类。

Runnable: 函数式接口,只有一个 run() 方法,对任务的抽象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Demo {
public static class Job implements Runnable {
@Override
public void run() {
System.out.println("MyThread");
}
}

public static void main(String[] args) {
new Thread(new Job()).start();
// 匿名类实现
new Thread(() -> {
System.out.println("Java 8 匿名内部类");
}).start();
}
}

注意要调用 start() 方法,线程才算启动。

虚拟机会为我们创建一个线程,然后等到这个线程得到时间片时调用 run() 方法。

Thread 类构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
// 片段1 - init方法
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals)

// 片段2 - 构造函数调用init方法
public Thread(Runnable target) {
init(null, target, "Thread-" + nextThreadNum(), 0);
}

// 片段3 - 两个对用于支持ThreadLocal的私有属性
ThreadLocal.ThreadLocalMap threadLocals = null;
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

挨个解释一下入参

  1. ThreadGroup g 线程组,指定这个线程在哪个线程组下。默认继承父进程的。
  2. Runnable target 要执行的任务
  3. String name 线程的名字
  4. long stackSize 线程的栈大小。(一般不需要指定;即使指定了JVM也不一定听你的)
  5. AccessControlContext acc 不用。不关注。
  6. boolean inheritThreadLocals是否从父进程那里继承inheritableThreadLocals, Mtrace 基于这个实现。

实际使用时一般只调用两个基础的构造方法。

1
2
Thread(Runnable target)
Thread(Runnable target, String name)
Thread 关键方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static native void sleep(long millis) throws InterruptedException;

public static native Thread currentThread();

public final native boolean isAlive();

public void interrupt()

public synchronized void start()

public void run() {
if (target != null) {
target.run();
}
}

// Object类方法
public final native void wait(long timeoutMillis) throws InterruptedException;

public final synchronized void join(final long millis)

挨个解释一下

  1. sleep native 方法,让出时间片,进入TIMED_WAITING状态,用于暂停执行;
  2. currentThread 静态方法,获取当前线程
  3. isAlive() native方法,调用了 start() 方法 且还没有结束
  4. interrupt() 中断
  5. start() 启动线程
  6. run() 任务。一般使用一个 Runnable 变量解耦线程对象和线程任务
  7. wait() Object类的方法。必须在同步块内调用。让出当前锁,放弃时间片,用于和其他线程之间的通信
  8. join()fork 相对,调用方阻塞并等待被调用方结束。内部通过 wait()方法实现
Thread 生命周期

Java线程在运行的生命周期中的某一时刻只可能处于下面6中状态的其中一个状态:

  • NEW:初始状态,被创建出来还没被调用start()
  • RUNNABLE:运行状态,线程被调用了start()等待运行的状态。
  • BLOCKED:阻塞状态,需要等待锁释放。
  • WAITING:等待状态,表示该线程需要等待其他线程做出一些特定动作(通知或中断)
  • TIMED_WAITING:超时等待状态,在指定的时间自行返回,而不像WAITING一直等待
  • TERMINATED:终止,表示已经运行完毕。

至此,涉及的模型如下

Callable & Future

上面使用 Runnable 有一个关键问题,就是 run() 方法是没有返回值的。

我们有时候需要线程帮我们执行一些任务,而且这些任务是有返回值的。

Java 提供了 Callable 接口和 Future 接口来解决这个问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}

public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);

boolean isCancelled();

boolean isDone();

V get() throws InterruptedException, ExecutionException;

V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

Callable 提供了一个有返回值的接口,使用上语义和 Runnable 类似。

Future 表示一个异步计算的结果,它提供了一些方法来检查计算是否完成,等待计算完成,并获取计算的结果。

通过线程池的一个例子看一下 这两个接口的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
class Demo {
public static void main(String args[]) throws Exception {
// 使用
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> result = executor.submit(() -> {
Thread.sleep(1000);
return 1;
});

// 注意调用get方法会阻塞当前线程,直到得到结果。
System.out.println(result.get());
}
}

这里线程池 ExecutorService 提供了一个 submit 方法,将一个 Callable 对象作为入参,可以得到一个 Future 对象,

后续通过 Future.get() 方法获得执行结果。

上面说了 Thread 最终是通过一个 Runnable 对象来指定任务,那么线程池传入一个 Callable 是如何转换的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface Executor {
void execute(Runnable command);
}

public interface ExecutorService extends Executor { // ...}

public abstract class AbstractExecutorService implements ExecutorService {
//...

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}

这里可以看到,execute 确实是以 Runnable 对象作为入参。

使用入参 Callable 作为构造参数获得了一个 RunnableFuture 对象,然后将它传入 execute() 方法

这里出现了两个新的类,看一下它们的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

public class FutureTask<V> implements RunnableFuture<V> {
private Callable<V> callable;
private Object outcome;

public FutureTask(Callable<V> callable) {
this.callable = callable;
}

public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
}

//核心逻辑
public void run() {
outcome = c.call();
}
public V get() {
return outcome;
}
}

可以看到 RunnableFuture 接口继承了 Runnable 接口 和 Future 接口, FutureTask 是它的一个实现类。

FutureTaskCallable 对象包装成了一个 Runnable 任务,传入了 Executor

线程模型现在变成了这样,引入了可获得任务结果的能力和简单提到了线程池。

Executor

上面提到了线程池。

为什么使用?有什么好处?
  1. 创建/销毁线程需要消耗系统资源,线程池可以复用已创建的线程
  2. 控制并发的数量。并发数量过多,可能会导致资源消耗过多,从而造成服务器崩溃。(主要原因)
  3. 可以对线程做统一管理
ThreadPoolExecutor 的使用

上面有一个demo,调用 submit 方法,或者在不需要返回值时调用 execute

1
2
3
4
5
6
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
ThreadPoolExecutor 的构造方法

Java中的线程池顶层接口是Executor接口,ThreadPoolExecutor是这个接口的实现类。

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

挨个解释一下

  1. corePoolSize 核心线程数
    • 线程池中线程分为两种:核心线程和非核心线程。核心线程创建出来后什么都不干也会一直存活;非核心线程长时间闲置会被销毁。
  2. maximumPoolSize 最大线程数
    • 核心线程数 + 非核心线程数量
  3. keepAliveTime 非核心线程最大闲置时间
  4. unit 上面这个参数的时间单位
  5. workQueue 阻塞队列
  6. threadFactory 【非必传】线程工厂,在创建线程时统一设置一些参数
  7. handler 【非必传】拒绝策略,指定任务被拒绝时的策略,默认抛出异常
ThreadPoolExecutor 核心任务处理逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1.当前线程数小于corePoolSize,则调用addWorker创建核心线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果不小于corePoolSize,则将任务添加到workQueue队列。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 2.1 如果isRunning返回false(状态检查),则remove这个任务,然后执行拒绝策略。
if (! isRunning(recheck) && remove(command))
reject(command);
// 2.2 线程池处于running状态,但是没有线程,则创建线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.如果放入workQueue失败,则创建非核心线程执行任务,
// 如果这时创建非核心线程失败(当前线程总数不小于maximumPoolSize时),就会执行拒绝策略。
else if (!addWorker(command, false))
reject(command);
}

总结一下处理流程

  1. 线程总数量 < corePoolSize,无论线程是否空闲,都会新建一个核心线程执行任务(让核心线程数量快速达到corePoolSize,在核心线程数量 < corePoolSize时)。注意,这一步需要获得全局锁。
  2. 线程总数量 >= corePoolSize时,新来的线程任务会进入任务队列中等待,然后空闲的核心线程会依次去缓存队列中取任务来执行(体现了线程复用)。
  3. 当缓存队列满了,说明这个时候任务已经多到爆棚,需要一些“临时工”来执行这些任务了。于是会创建非核心线程去执行这个任务。注意,这一步需要获得全局锁。
  4. 缓存队列满了, 且总线程数达到了maximumPoolSize,则会采取上面提到的拒绝策略进行处理。
ThreadPoolExecutor 如何做到线程复用?

我们知道,一个线程在创建的时候会指定一个线程任务,当执行完这个线程任务之后,线程自动销毁。但是线程池却可以复用线程,即一个线程执行完线程任务后不销毁,继续执行另外的线程任务。那么,线程池如何做到线程复用呢?

上面 execute 里面也只看到了 addWorker 就结束了,这个 Worker 是什么?它是怎么去执行任务的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
private boolean addWorker(Runnable firstTask, boolean core) {
//1. retry 部分判断线程数量和线程池状态
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;

for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

//2. 创建一个Worker对象,从中取出thread并执行start。
// 如果创建失败则回滚
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();

if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

这里出现了一个 Worker 类。

Worker类实现了Runnable接口,所以Worker也是一个线程任务。在构造方法中,创建了一个线程,线程的任务就是自己。

addWorker方法中 t.start,会触发Worker类的run方法被JVM调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

public void run() {
runWorker(this);
}

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
}

上面说到 addWorker 里核心逻辑其实是新建一个线程,将 worker 对象的 runWorker 方法作为线程的任务执行。

runWorker 方法中的核心逻辑就是

  1. 首先去执行创建这个worker时就有的任务(当执行完这个任务后,worker的生命周期并没有结束)
  2. while循环中,worker会不断地调用getTask方法从阻塞队列中获取任务然后调用task.run()执行任务。

这样达到复用线程的目的。只要getTask方法不返回null,此线程就不会退出。

再看一下 getTask 方法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();

// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

核心线程的会一直卡在workQueue.take方法,被阻塞并挂起,不会占用CPU资源,直到拿到Runnable 然后返回(当然如果allowCoreThreadTimeOut设置为true,那么核心线程就会去调用poll方法,因为poll可能会返回null,所以这时候核心线程满足超时条件也会被销毁)。

非核心线程会workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,如果超时还没有拿到,下一次循环判断compareAndDecrementWorkerCount就会返回null,Worker对象的run()方法循环体的判断为null,任务结束,然后线程被系统回收 。

至此,线程模型变成这样:

CompletableFuture

Future在实际使用中有一些局限,比如不支持异步任务的编排组合、get()方法是阻塞的等

Java8 引入CompletableFuture来解决这些缺陷。

CompletableFuture提供了增强的Future特性,还提供了函数式编程、异步任务编排组合等能力。

CompletableFuture 的构造方法
1
2
3
4
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

不指定线程池的话,就会使用公共的ForkJoinPool

CompletableFuture 的常用方法
1
2
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);

附:阻塞队列

下面还没写。。还在草稿

线程同步 & 通信

AQS

线程安全

CPU会对内存变量进行缓存。持有变量的一个“副本”。

实际执行时对变量的副本进行读写,就会出现不一致的情况。

使用关键字volatile修饰变量,就可以保证变量的可见性(即禁用CPU缓存)

volatile关键字还会禁止指令重排序,以双重检验锁实现单例来看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Singleton {

private volatile static Singleton uniqueInstance;

private Singleton() {
}

public static Singleton getUniqueInstance() {
//先判断对象是否已经实例过,没有实例化过才进入加锁代码
if (uniqueInstance == null) {
//类对象加锁
synchronized (Singleton.class) {
if (uniqueInstance == null) {
uniqueInstance = new Singleton();
}
}
}
return uniqueInstance;
}
}

uniqueInstance = new Singleton()这段代码分为三步执行

  1. uniqueInstance分配内存空间
  2. 初始化uniqueInstance
  3. uniqueInstance指向分配的内存地址

执行顺序可能变成1 -> 3 -> 2。多线程环境下可能会造成一个线程获得还没有初始化的实例

ThreadLocal

背景:给线程搞点自己的专属本地变量。

原理:

1
2
3
4
5
6
7
8
9
public class Thread implements Runnable {
//......
//与此线程有关的ThreadLocal值。由ThreadLocal类维护
ThreadLocal.ThreadLocalMap threadLocals = null;

//与此线程有关的InheritableThreadLocal值。由InheritableThreadLocal类维护
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
//......
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
}

参考资料

http://concurrent.redspider.group/article/01/2.html


Java-1 并发
https://yzaf.top/2023/java/Java-1/
作者
why
发布于
2023年7月15日
许可协议