f10@t's blog

Java多线程编程Review

字数统计: 4.7k阅读时长: 20 min
2023/03/21
loading

本篇复习一下Java中多线程编程技术,讨论如线程安全性和如何使用线程池的问题,并学习关于锁的相关编程方法。(ps:想我17年的i5-7200U只是个双核心四线程的,2022年换的笔记本都8c16了汗...AMD yyds嘿嘿

什么是线程?

进程我们都知道,一个进程中包含了要执行的指令、数据等等,通过使用多进程即可提高硬件的利用率。但是进程的创建、销毁需要消耗大量的系统资源。此外,多进程并行时变量的共享是一件很麻烦的事情。

因此,有了线程的概念。线程(Thread)是操作系统调度资源的最小单位,一个进程中可以包含多个线程,共享同一个进程空间中的变量。比如,当我们启动一个JVM进程时,可以通过JConsole观察到已经有一些线程在活动了,如垃圾回收线程、RMI线程等等。

一般的书中讲解声明周期的时候,都喜欢用5个状态来表示,如下5:

创建状态(New),可运行状态(Runnable),阻塞状态(Blocked),运行状态(Running),结束状态(Terminated)

线程生命周期
  • New:当使用关键字new新建了一个Thread对象时,就处在该状态,此时并未执行,并未产生真正的线程,只是一个对象
  • Runnable:当我们调用start()的方法后,JVM中就会创建一个真正的线程,此时该线程就是可以执行的状态,等待调度器调度获取CPU时间片
  • Running:当调度器选择了该线程时就可以运行了
  • Blocked:如下情况会导致一个线程被阻塞挂起:
    • 调用了sleep、wait方法,使得该线程加入了waitSet
    • 进行了某个阻塞的IO调用,如网络读取数据
    • 获得了某个锁资源,进入了该锁资源的阻塞队列
    • 调用yield主动放弃了CPU执行权
    • CPU调度器轮询使得该线程放弃了执行
  • Terminated:线程的最终状态,整个生命周期结束,可能的原因如下:
    • 线程运行正常结束
    • 线程运行出错意外结束
    • JVM崩溃导致所有的线程都退出

实际在JDK的Thread类源码中,线程的状态定义如下:

1
2
3
4
5
6
7
8
public enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}

共有六个,其中NEWRUNNABLEBLOCKEDTERMINATED与我们前述相同。没有RUNNING这个,而RUNNABLE其实已经暗含了RUNNING的意思了,即正在运行,但是有可能是在等待CPU时间片等资源。

Thread state for a runnable thread. A thread in the runnable state is executing in the Java virtual machine but it may be waiting for other resources from the operating system such as processor.

另外有WAITINGTIME_WAITING这两个状态。分别定义如下:

  • WAITING

    Thread state for a waiting thread. A thread is in the waiting state due to calling one of the following methods:

    • Object.wait with no timeout
    • Thread.join with no timeout
    • LockSupport.park

    A thread in the waiting state is waiting for another thread to perform a particular action. For example, a thread that has called Object.wait() on an object is waiting for another thread to call Object.notify() or Object.notifyAll() on that object. A thread that has called Thread.join() is waiting for a specified thread to terminate.

  • TIME_WAITING

Thread state for a waiting thread with a specified waiting time. A thread is in the timed waiting state due to calling one of the following methods with a specified positive waiting time:

  • Thread.sleep
  • Object.wait with timeout
  • Thread.join with timeout
  • LockSupport.parkNanos
  • LockSupport.parkUntil

因此如果我们用JDK中的状态来理解的话,应该是这样的:

如何使用线程

在Java中唯一创建线程对象的方法,就是使用new关键字创建Thread对象。具体的实现方式有两种:继承Thread类实现Runnable接口。

继承Thread类

当一个类继承了Thread类后,需要覆写Thread的run方法,并通过new关键字即可构建一个可以运行的线程对象。这里的核心其实是模板设计方法

看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package lzw.Thread.review.CreateThread;

/**
* 使用继承Thread类的方式定义线程对象
*
* @author lzwgiter
* @since 2023/02/10
*/
public class UsingThread extends Thread {
@Override
public void run() {
System.out.println("I come from a child class from Thread.");
}
}

// main函数
public class Demo {
public static void main(String[] args) {
Thread testThread = new UsingThread();
testThread.start();
}
}

输出:

继承Thread

我们只需要继承Thread类并覆写他的run方法,并通过调用start()方法即可。这里重点关注一个问题,模板设计模式体现在了哪里?

模板设计方法:父类定义算法结构,子类实现逻辑细节

我们debug看一下:

start0方法

注意到这里实际调用的是start0()方法,继续跟进一下:

本地方法start0

可以看到这是一个native方法,该方法的作用为使得JVM调用该线程的run方法。所以Thread.start()方法本质上会调用该线程的start0()方法,控制该线程启动。

模板在哪里呢?我们看一下Thread类原始的run()方法:

Thread.run()

该方法是Runnable接口定义的,在Thread类中为调用target的run方法。而对于继承Thread类的写法,这个target是不存在的,即null:

target

因此原始的Thread类中的run实际上是一个空方法。而start()函数会调用run()。这就是模板设计模式的体现了。

即子类(上述例子中的UsingThread)覆写父类(Thread)的逻辑细节(run方法),并由父类的算法逻辑调用具体细节(start方法)

实现Runnable接口

当一个类实现了该接口,并覆写了run方法后,即可以通过new关键字构造线程对象。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package lzw.Thread.review.CreateThread;

/**
* 使用实现Runnable接口的方式定义线程对象
*
* @author lzwgiter
* @since 2023/02/10
*/
public class UsingRunnable implements Runnable {
@Override
public void run() {
System.out.println("I come from a child class from Runnable.");
}
}

// main函数
public class Demo {
public static void main(String[] args) {
// Thread testThread = new UsingThread();
Thread testThread = new Thread(new UsingRunnable());
testThread.start();
}
}

运行结果:

使用Runnable创建线程

debug看一下流程:

可以看到仍然是要调用start0方法,只是此时,taget变量已经有指向了,即我们定义的实现了Runnable接口的对象。自然而然,start0方法也就使得我们进入了上述方法:

target的run方法

区别

继承Thread类的方式,不同线程不能共享同一个run执行单元,即各自运行各自的run方法;而Runnable方式,不同线程可以共享同一个run方法单元:

二者区别

啥意思呢?如果你是继承Thread的方式,不同线程想操纵同一个类内变量的话,那就得把它声明为静态变量了。

而继承Runnable接口的方式由于只存在这一个类实例,因此天然的大家可以操作同一个类内变量。

常见线程API

sleep

字如其意,让当前线程休眠,暂停执行。方法声明如下:

1
2
public static native void sleep(long millis) throws InterruptedException
public static void sleep(long millis, int nanos) throws InterruptedException

其中,第二个精度更精细,可以在睡眠毫秒数后,再睡眠纳秒。使用方法也很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package lzw.Thread.review.basics;

/**
* sleep函数的使用
*
* @author lzwgiter
* @since 2023/02/10
*/
public class UsingSleep {
public static void main(String[] args) {
try {
// 睡眠5s
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

当然使用JDK 1.5后提供的sleep的封装,TimeUnit类的话可读性会更强:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package lzw.Thread.review.basics;

import java.util.concurrent.TimeUnit;

/**
* sleep函数的使用
*
* @author lzwgiter
* @since 2023/02/10
*/
public class UsingSleep {
public static void main(String[] args) {
try {
// 日
TimeUnit.DAYS.sleep(1);
// 小时
TimeUnit.HOURS.sleep(1);
// 分钟
TimeUnit.MINUTES.sleep(1);
// 秒
TimeUnit.SECONDS.sleep(1);
// 毫秒
TimeUnit.MILLISECONDS.sleep(1);
// 微秒
TimeUnit.MICROSECONDS.sleep(1);
// 纳秒
TimeUnit.NANOSECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

这部分唯一一个注意的地方就是,sleep函数的调用不会使得当前线程释放已经持有的锁:

yield

这个方法比较少用,作用是暂时放弃当前的CPU资源。但是如果CPU资源充足的话,会忽略这个放弃请求。给一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package lzw.Thread.review.basics;

import java.util.stream.IntStream;

/**
* 使用yield
*
* @author lzwgiter
* @since 2023/02/10
*/
public class UsingYield {
public static void main(String[] args) {
IntStream.range(0, 2).mapToObj(UsingYield::createThread).forEach(Thread::start);
}

public static Thread createThread(int index) {
return new Thread() {
@Override
public void run() {
if (index == 1) {
// 若为1号线程,则暂时让出CPU资源
yield();
}
System.out.println(index);
}
};
}
}

我们创建了两个线程并调用start方法,其中第二个线程若优先得到CPU资源,则会yield让出给第一个线程,由RUNNING状态转变为RUNNABLE状态。此时理论上输出应该始终是0, 1。但是前面也提到了,CPU资源充足时候,会忽略。

join

方法声明如下:

1
2
3
public final void join() throws InterruptedException;
public final synchronized void join(long millis) throws InterruptedException;
public final synchronized void join(long millis, int nanos) throws InterruptedException

第一个本质上调用的是join(0),第三个就是多了纳秒而已。主要看第二个。给一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package lzw.Thread.review.basics;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* @author lzwgiter
* @since 2023/02/10
*/
public class UsingJoin {

public static Thread createThread(int index) {
return new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "#" + i);
}
}, "worker0" + index);
}

public static void main(String[] args) throws InterruptedException {
List<Thread> threads = IntStream.range(0, 2).mapToObj(UsingJoin::createThread)
.collect(Collectors.toList());
threads.forEach(Thread::start);

System.out.println(Thread.currentThread().getName() + "嗨害嗨, 我来了啊");

// 启动线程
threads.get(1).join();
threads.get(0).join();

System.out.println(Thread.currentThread().getName() + "刚才BLOCKED了,那俩线程已经TERMINATED了");
System.out.println(threads.get(0).getName() + " status: " + threads.get(0).getState());
System.out.println(threads.get(1).getName() + " status: " + threads.get(1).getState());
}
}

下面是输出结果:

join

可以看到,主线程调用worker01、worker02线程的join方法后,会从RUNNING状态变为BLOCKED状态,直到join对象的线程生命周期结束。中间可以看到二者轮流输出,这是抢占式CPU调度的结果,当他们都运行结束后,即TERMINATED后,主线程才会继续自己的工作。

interrupt

划重点,这个API很重要,函数声明如下:

1
public void interrupt();

该函数用于打断一个线程的阻塞状态。哪些函数会使得线程进入BLOCKED状态呢?如下:

  • Object.wait()、Object.wait(long)、Object.wait(long, int)
  • Thread.sleep()、Thread.sleep(long)、Thread.sleep(long, int)
  • Thread.join()、Thread.join(long)、Thread.join(long, int)
  • InterruptibleChannel的io操作
  • Selector的wakeup方法

而interrupt方法会打断上述线程的阻塞状态,因此上述方法也叫“可中断方法”。当一个线程在运行时,若执行中断操作,则会抛出InterruptedException异常,给一个例子。此外,我需要重点说明一下interrupted标识符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package lzw.Thread.review.basics;

import java.util.concurrent.TimeUnit;

/**
* 中断一个线程
*
* @author lzwgiter
* @since 2023/02/12
*/
public class UsingInterrupt {
public static void main(String[] args) throws InterruptedException {
Thread threadToInterrupt = new Thread(() -> {
try {
// sleep一分钟,该方法为可中断方法
TimeUnit.MINUTES.sleep(1);
} catch (InterruptedException ex) {
ex.printStackTrace();
System.out.println("I am Interrupted.");
}
});

threadToInterrupt.start();
// 主线程睡眠2s后,中断该线程
TimeUnit.SECONDS.sleep(3);
threadToInterrupt.interrupt();
}
}

我们创建一个线程,他的执行逻辑是睡眠1分钟,而sleep方法是一个可中断方法,因此当我们手动中断后,会进入catch代码块:

interrupt

这里我们重点看一下interrupted标识符,他是线程对象内的一个变量,用于表示该线程是否被中断。而线程也会不断的去检查这个标识符:

若当前线程被中断,则该位设置为true。但有一个特殊的点,即,若该线程执行的方法为可中断方法,则在中断后,会自动清除该标志位。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package lzw.Thread.review.basics;

import java.util.concurrent.TimeUnit;

/**
* 中断一个线程
*
* @author lzwgiter
* @since 2023/02/12
*/
public class UsingInterrupt {
public static void main(String[] args) throws InterruptedException {
Thread threadToInterrupt = new Thread(() -> {
try {
// sleep一分钟,该方法为可中断方法
TimeUnit.MINUTES.sleep(1);
} catch (InterruptedException ex) {
ex.printStackTrace();
System.out.println("I am Interrupted.");
}
});

threadToInterrupt.start();
// 主线程睡眠2s后,中断该线程
System.out.println(threadToInterrupt.isInterrupted());
TimeUnit.SECONDS.sleep(3);
threadToInterrupt.interrupt();
System.out.println(threadToInterrupt.isInterrupted());
}
}
可中断方法

可以看到,两次输出都是false。而若非中断方法,比如我们只是单纯的循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package lzw.Thread.review.basics;

import java.util.concurrent.TimeUnit;

/**
* 中断一个线程
*
* @author lzwgiter
* @since 2023/02/12
*/
public class UsingInterrupt {
public static void main(String[] args) throws InterruptedException {
Thread threadToInterrupt = new Thread(() -> {
while (true) {

}
});

threadToInterrupt.start();
// 主线程睡眠2s后,中断该线程
System.out.println(threadToInterrupt.isInterrupted());
TimeUnit.SECONDS.sleep(3);
threadToInterrupt.interrupt();
System.out.println(threadToInterrupt.isInterrupted());
System.out.println(threadToInterrupt.getState());
}
}
isInterrupted

则可以看到第二次输出为true,且当前程序并未结束,说明被中断的线程并非结束了自己的声明周期,上图中就显示为RUNNABLE而非TERMINATED。

此外,在Thread类中还有一个与这个方法很像的****:interrupted,该方法作用相同,但是会擦除flag标识位,如图:

interrupted方法

获取线程信息(线程对象、ID、上下文类加载器)

  • 线程对象
1
Thread.currentThread();
  • ID
1
Thread.currentThread().getId();

创建线程的ID从0开始,依次递增1。

  • 上下文类加载器
1
Thread.getContextClassLoader();

可以得知这个线程是由哪个类加载器加载的。如果没有修改上下文加载器的话,默认为父线程的类加载器。

经典生产者-消费者问题--线程间如何通信?

单线程间通信

我们先分别定义生产者和消费者,其中生产者生产对象时不需要时间,而消费者需要时间来消费一个对象。

  • 首先我们定义一个用于定义生产和消费的对象,并定义一个服务TransactionService,该服务中维护了一个单例的链表。
  • 然后分别定义生产者和消费者代码
  • 定义两个单线程分别执行生产和消费

具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package lzw.Thread.review.entity;

/**
* 事务
*
* @author
* @since 2021/02/08
*/
public class Transaction {
long data;

public Transaction(long data) {
this.data = data;
}

public long getData() {
return data;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package lzw.Thread.review.demo;

import lzw.Thread.review.entity.Transaction;

import java.util.LinkedList;

/**
* 生产者
*
* @author lzwgiter
* @since 2023/02/07
*/
public class Producer implements Runnable {

private final LinkedList<Transaction> pool = TransactionService.getQueue();

/*
全局唯一标识符
*/
private int currentId = 0;

private static final int MAX_SIZE = 5;

@Override
public void run() {
while (true) {
synchronized (pool) {
if (pool.size() == MAX_SIZE) {
System.out.println("当前队列已满");
try {
pool.wait();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
// 添加事务
pool.add(new Transaction(currentId++));
System.out.println(Thread.currentThread().getName() + "# 新事务已提交");
// 唤醒消费者线程
pool.notify();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package lzw.Thread.review.demo;

import lzw.Thread.review.entity.Transaction;

import java.util.LinkedList;

/**
* 消费者
*
* @author lzwgiter
* @since 2023/02/07
*/
public class Consumer implements Runnable {

private final LinkedList<Transaction> pool = TransactionService.getQueue();

@Override
public void run() {
while (true) {
try {
// 模拟消费事务
Thread.sleep(200);
} catch (InterruptedException ex) {
ex.printStackTrace();
}

synchronized (pool) {
if (pool.size() == 0) {
System.out.println("当前队列为空");
try {
pool.wait();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}

System.out.println(Thread.currentThread().getName() + "# 消费事务:" + pool.removeFirst().getData());
// 通知生产者线程
pool.notify();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package lzw.Thread.review.demo;

import lzw.Thread.review.entity.Transaction;

import java.util.LinkedList;

/**
* @author lzwgiter
* @since 2023/02/07
*/
public class TransactionService {
private static final LinkedList<Transaction> POOL = new LinkedList<>();

private TransactionService() {}

public static LinkedList<Transaction> getQueue() {
return POOL;
}
}

这里核心的部分就是sychronized代码块,以及wait和notify操作。

首先需要明确的是,wait和notify操作必须在同步代码块内才可以使用。进入同步代码块后,首先我们会块持有的,是单例对象的Monitor。对于消费者而言,若队列为空,则加入该Monitor的等待集合中;对于生产者而言,如队列已满,则加入该Monitor的等待集合中。上述操作是通过wait函数完成的,此时将释放所持有的Monitor对象。

而当生产者新生产了数据以及消费者消费了数据之后,将会调用Notify方法,唤醒该Monitor等待集合中的线程。部分运行结果如下图:

单消费者单生产者

多线程间通信

上面我们使用Wait和Notify函数完成了两个单线程之间的同步,实际上,多线程之间也有类似的机制。

我们仍使用上述的Producer和Consumer,只需要将notify函数更换为notifyAll即可。运行结果如下:

多线程间通信

wait和notify、notifyAll时发生了什么

深入浅出的说明一下。这三个函数首先需要在同步代码块中使用,即前提是,持有了Monitor,否则会抛出IllegalMonitorStateException异常。而每一个Monitor都对应拥有一个线程休息室(wait set),具体实现与JDK实现有关,我们以oracle的JDK为例。其中,单线程和多线程通信的过程如图所示:

原理

而若是多线程下,我们使用notify()函数而非notifyAll(),则会导致不同步的问题。在oracle JDK的实现中,notify会随机唤醒一个线程:

多线程中的notify

举一个简单的例子,假设我们有两个消费者线程,当前队列为空。当生产者放入一个对象后,notify方法会随机的唤醒一个消费者线程。此时,该消费者线程在取出对象后,会再次调用notify方法,而此时唤醒的是另一个消费者线程,直接进入了消费代码片段,就会导致从空列表中获取数据的操作。如图:

多线程使用notify存在的隐患

使用线程池

由于线程的创建、启动、销毁都比较耗费系统资源,因此有了线程池。线程池中的线程可以重复利用,提高系统效率。JDK 1.5开始提供了ExecutorExecutorService接口以及对应的一系列实现。一个线程池具有如下关键要素:

  • 任务队列:用于缓存我们提交的Runnable的任务
  • 线程数量管理:关键数量包括:初始化线程数量init、自动扩充时的最大阈值数量max、维护核心线程数量core。关系为init<=core<=max
  • 线程工厂:用于创建线程

其中Executor接口定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Executor {

/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}

ExecutorService接口则在上述接口的基础上,提供了更多用于任务提交和管理的方法:

ExecutorService

重点学习该接口的两个实现:ThreadPoolExecutorScheduledThreadPoolExecutor

ThreadPoolExecutor

ScheduledThreadPoolExecutor

CATALOG
  1. 1. 什么是线程?
  2. 2. 如何使用线程
    1. 2.1. 继承Thread类
    2. 2.2. 实现Runnable接口
    3. 2.3. 区别
    4. 2.4. 常见线程API
      1. 2.4.1. sleep
      2. 2.4.2. yield
      3. 2.4.3. join
      4. 2.4.4. interrupt
      5. 2.4.5. 获取线程信息(线程对象、ID、上下文类加载器)
  3. 3. 经典生产者-消费者问题--线程间如何通信?
    1. 3.1. 单线程间通信
    2. 3.2. 多线程间通信
    3. 3.3. wait和notify、notifyAll时发生了什么
  4. 4. 使用线程池
    1. 4.1. ThreadPoolExecutor
    2. 4.2. ScheduledThreadPoolExecutor