前言

疫情期间看完了《Java并发编程实战》一书,看完之后觉得有些囫囵吞枣,没有留下深刻印象,因此写点小小总结,把握一下重点知识,尽量形成认知框架中的一部分。

原书的内容十分详细,也肯定写得比我好。我这里只做简单的概括,详细的还是去书里看比较好。

线程池

使用线程池的好处:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

线程池的类关系如图所示:


线程池的类关系图

Executor框架接口

Executor框架是一个根据一组执行策略调用、调度、执行和控制的异步任务的框架,目的是提供一种机制,将“任务提交”与“任务如何运行”分离开来

J.U.C中有三个Executor接口:

  • Executor:一个运行新任务的简单接口;
  • ExecutorService:扩展了Executor接口。添加了一些用来管理执行器生命周期和任务生命周期的方法;
  • ScheduledExecutorService:扩展了ExecutorService接口。支持Future和定期执行任务。

Executor接口

Executor接口只有一个execute方法,用来替代创建或启动线程的方法

1
2
3
public interface Executor {
void execute(Runnable command);
}

例如,使用Thread来创建并启动线程的代码如下:

1
2
Thread t = new Thread();
t.start();

而使用Executor来启动线程就是:

1
2
Thread t = new Thread();
executor.execute(t);

至于execute()方法如何执行代码,则要结合实际的线程池实现考虑

ExecutorService接口

ExecutorService接口继承自Executor接口,提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成Future的方法。增加了shutDown()shutDownNow()invokeAll()invokeAny()submit()等方法。如果需要支持即时关闭,也就是shutDownNow()方法,则任务需要正确处理中断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成——包括哪些还未开始执行的任务
void shutdown();
// 执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务
List<Runnable> shutdownNow();
// 等待ExecutorService到达终止状态
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 提交任务,通过返回的Future跟踪任务的运行状态
<T> Future<T> submit(Callable<T> task);
// 执行批量任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 执行批量任务中的任意一个
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

ExecutorService关闭后提交的任务将由“拒绝执行处理器(RejectExecutionHandler)”来处理,它会抛弃任务,或者使得execute方法抛出一个未检查的RejectedExecutionException

ScheduledExecutorService接口

ScheduledExecutorService扩展ExecutorService接口并增加了schedule方法。

1
2
3
4
5
6
// 创建并执行在给定延迟后启用的一次性操作
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
// 创建并执行在给定延迟后启用的 ScheduledFuture
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);

调用schedule方法可以在指定的延时后执行一个Runnable或者Callable任务。ScheduledExecutorService接口还定义了按照指定时间间隔定期执行任务的scheduleAtFixedRate()方法和scheduleWithFixedDelay()方法。

ThreadPoolExecutor

ThreadPoolExecutor继承自AbstractExecutorService,也实现了ExecutorService接口。ThreadPoolExecutor构造方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

构造方法中的字段含义如下:

  • corePoolSize:核心线程数量,当有新任务在execute()方法提交时,会执行以下判断:
    • 如果运行的线程少于corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;
    • 如果线程池中的线程数量大于等于corePoolSize且小于maximumPoolSize,则只有当workQueue满时才创建新的线程去处理任务;
    • 如果设置的corePoolSizemaximumPoolSize相同,则创建的线程池的大小是固定的,这时如果有新任务提交,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理;
    • 如果运行的线程数量大于等于maximumPoolSize,这时如果workQueue已经满了,则通过handler所指定的策略来处理任务;
      所以任务提交时,判断的顺序为 corePoolSize –> workQueue –> maximumPoolSize
  • maximumPoolSize:最大线程数量;
  • workQueue:等待队列,当任务提交时,如果线程池中的线程数量大于等于corePoolSize的时候,把该任务封装成一个Worker对象放入等待队列;
  • workQueue:保存等待执行的任务的阻塞队列,当提交一个新的任务到线程池以后, 线程池会根据当前线程池中正在运行着的线程的数量来决定对该任务的处理方式,主要有以下几种处理方式:
    • 直接切换:这种方式常用的队列是SynchronousQueue,但现在还没有研究过该队列,这里暂时还没法介绍;
    • 使用无界队列:一般使用基于链表的阻塞队列LinkedBlockingQueue。如果使用这种方式,那么线程池中能够创建的最大线程数就是corePoolSize,而maximumPoolSize就不会起作用了(后面也会说到)。当线程池中所有的核心线程都是RUNNING状态时,这时一个新的任务提交就会放入等待队列中。
    • 使用有界队列:一般使用ArrayBlockingQueue。使用该方式可以将线程池的最大线程数量限制为maximumPoolSize,这样能够降低资源的消耗,但同时这种方式也使得线程池对线程的调度变得更困难,因为线程池和队列的容量都是有限的值,所以要想使线程池处理任务的吞吐率达到一个相对合理的范围,又想使线程调度相对简单,并且还要尽可能的降低线程池对资源的消耗,就需要合理的设置这两个数量。
      • 如果要想降低系统资源的消耗(包括CPU的使用率,操作系统资源的消耗,上下文环境切换的开销等), 可以设置较大的队列容量和较小的线程池容量, 但这样也会降低线程处理任务的吞吐量。
      • 如果提交的任务经常发生阻塞,那么可以考虑通过调用setMaximumPoolSize() 方法来重新设定线程池的容量。
      • 如果队列的容量设置的较小,通常需要将线程池的容量设置大一点,这样CPU的使用率会相对的高一些。但如果线程池的容量设置的过大,则在提交的任务数量太多的情况下,并发量会增加,那么线程之间的调度就是一个要考虑的问题,因为这样反而有可能降低处理任务的吞吐量。
  • keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime
  • threadFactory:它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory()来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
  • handler:它是RejectedExecutionHandler类型的变量,表示线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:
    • AbortPolicy:直接抛出异常,这是默认策略;
    • CallerRunsPolicy:用调用者所在的线程来执行任务;
    • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    • DiscardPolicy:直接丢弃任务;

execute方法

execute()方法用来提交任务,执行逻辑如下图:


execute()执行逻辑
  • 如果当前运行的线程少于corePoolSize,则会创建新的线程来执行新的任务;
  • 如果运行的线程个数等于或者大于corePoolSize,则会将提交的任务存放到阻塞队列workQueue中;
  • 如果当前workQueue队列已满的话,则会创建新的线程来执行任务;
  • 如果线程个数已经超过了maximumPoolSize,则会使用饱和策略RejectedExecutionHandler来进行处理。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,拥有execute()submit()提交异步任务的基础功能;同时实现了ScheduledExecutorService接口,拥有延时执行任务和周期执行任务的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 达到给定的延时时间后,执行任务。这里传入的是实现Runnable接口的任务,
// 因此通过ScheduledFuture.get()获取结果为null
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
// 达到给定的延时时间后,执行任务。这里传入的是实现Callable接口的任务,
// 因此,返回的是任务的最终计算结果
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
// 是以上一个任务开始的时间计时,period时间过去后,
// 检测上一个任务是否执行完毕,如果上一个任务执行完毕,
// 则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
// 当达到延时时间initialDelay后,任务开始执行。上一个任务执行结束后到下一次
// 任务执行,中间延时时间间隔为delay。以这种方式,周期性执行任务。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);

ScheduledThreadPoolExecutor中为了实现可延时执行任务和周期性执行任务的特性,任务会被转换成ScheduledFutureTask类,该类继承了FutureTask,并重写了run方法。

ScheduledThreadPoolExecutor也两个重要的内部类:DelayedWorkQueueScheduledFutureTaskDelayedWorkQueue实现了BlockingQueue接口,也就是一个阻塞队列;ScheduledFutureTask则是继承了FutureTask类,也表示该类用于返回异步任务的结果。

  • ScheduledFutureTask最主要的功能是根据当前任务是否具有周期性,对异步任务进行进一步封装。如果不是周期性任务(调用schedule方法)则直接通过run()执行,若是周期性任务,则需要在每一次执行完后,重设下一次执行的时间,然后将下一次任务继续放入到阻塞队列中。
  • DelayedWorkQueue是基于堆的数据结构,按照时间顺序将每个任务进行排序,将待执行时间越近的任务放在在队列的队头位置,以便于最先进行执行。

FutureTask基本操作

Executors框架体系中,FutureTask用来表示可获取结果的异步任务。FutureTask实现了Future接口,FutureTask提供了启动和取消异步任务,查询异步任务是否计算结束以及获取最终的异步任务的结果的一些常用的方法。通过get()方法来获取异步任务的结果,但是会阻塞当前线程直至异步任务执行结束。一旦任务执行结束,任务不能重新启动或取消,除非调用runAndReset()方法。

FutureTask的3种状态:

  • 未启动FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一个 FutureTask,还没有执行 FutureTask.run()方法之前,FutureTask处于未启动状态。
  • 已启动FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。
  • 已完成FutureTask.run()方法执行结束,或者调用FutureTask.cancel(...)方法取消任务,或者在执行任务期间抛出异常,这些情况都称之为FutureTask的已完成状态。

主要方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 当 FutureTask 处于未启动或已启动状态时,执行 FutureTask.get()方法将导致调用线程阻塞
// 如果 FutureTask 处于已完成状态,调用 FutureTask.get()方法将导致调用线程立即返回结果或者抛出异常
get()

// 与get()相同,但是阻塞时是有时限的等待
get(long timeout, TimeUnit unit)

// 当 FutureTask 处于未启动状态时,执行 FutureTask.cancel()方法将此任务永远不会执行;
// 当 FutureTask 处于已启动状态时,执行 FutureTask.cancel(true)方法将以中断线程的方式来阻止任务
// 继续进行,如果执行 FutureTask.cancel(false)将不会对正在执行任务的线程有任何影响;
// 当FutureTask处于已完成状态时,执行 FutureTask.cancel(...)方法将返回 false。
cancel()

// 来自RunnableFuture接口,运行任务
run()

// 想执行多次会调用
runAndReset()

FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以出现在任何替代Runnable的地方,也可以被线程直接调用FutureTask.run()

Future本身适用于解决多线程执行若干个任务,每个任务只需只需一次的情况,用Future可避免不必要的多次执行。

最后放一个线程池的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class FutureRenderer {
private final ExecutorService = new ThreadPoolExecutor();

void renderPage() {
final List<ImageInfo> imageInfos = scanForImageInfo(source);
Callable<List<ImageData>> task =
new Callable<List<ImageData>> () {
public List<ImageData> call() {
List<ImageData> result = new ArrayList<ImageData>();
for (ImageInfo imageInfo : imageInfos) {
result.add(imageInfo.downloadImage());
}
return result;
}
};

Future<List<ImageData>> future = executor.submit(task);
renderText(source);

try {
List<ImageData> imageData = future.get();
for (ImageData data : imageData) {
renderImage(data);
}
} catch (InterruptedException e) {
// 重新设置线程的中断状态
Thread.currentThread().interrupt();
// 由于不需要结果,因此取消任务
future.cancel(true);
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}

renderPage()方法负责渲染一个页面,其中页面可分为文字和图片两部分,图片下载时间较慢,因此通过多线程并行的方式加快速度。当然这个程序还能继续做并行,这里仅为了演示,不再深入,详见书本6.3.3节。

并发工具

这里主要记录一下书中提到的几种并发工具

AbstractQueuedSynchronizer(AQS)

AQS是实现CountDownLatchSemaphore等同步工具的基础,它负责管理同步器中的状态。

它管理了一个整数状态信息,可以通过getStatesetState以及compareAndSetStateprotected类型的方法进行操作。这个整数可以表示任何信息,如在ReentrantLock中表示所有者线程已经重复获取该锁的次数;Semaphore用它来表示剩余的许可数量;FutureTask用它来表示任务的状态,等等

如下代码给出了AQS中获取操作与释放操作的形式。根据同步器的不同,获取操作可以是一种独占操作(如ReentrantLock),也可以是一个非独占操作(如SemaphoreCountDownLatch)。

一个获取操作包括两部分。首先,同步器判断当前状态是否允许获得操作,如果是,则允许线程执行,否则获取操作将阻塞或失败。
其次就是更新同步器的状态,获取同步器的某个线程可能会对其他线程能否也获取该同步器造成影响。例如,当获取一个锁后,锁的状态将从“未被持有”变成“已被持有”,而从Semaphore中获取一个许可后,将把剩余许可的数量减一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
boolean acquire() throws InterruptedException {
while (当前状态不允许获取操作) {
if (需要阻塞获取请求) {
如果当前线程不在队列中,则将其插入队列
阻塞当前线程
} else {
返回失败
}
}
可能更新同步器的状态
如果线程位于队列中,则将其移出队列
返回成功
}

void release() {
更新同步器的状态
if (新的状态允许某个被阻塞的线程获取成功) {
解除队列中一个或多个线程的阻塞状态
}
}

利用AQS实现闭锁的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class OneShotLatch {
private final Sync sync = new Sync();
public void signal() { sync.releaseShared(0); }
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(0);
}

private class Sync extends AbstractQueuedSynchronizer {
protected int tryAcquireShared(int ignored) {
// 如果闭锁是开的(state == 1),那么这个操作将成功,否则失败
return (getState() == 1) ? 1 : -1;
}

protected boolean tryReleaseShared(int ignored) {
setState(1); // 现在打开闭锁
return true; // 现在其他的线程可以获取该闭锁
}
}
}

ReentrantLock

ReentrantLock只支持独占的获取操作,因此它实现了tryAcquiretryReleaseisHeldExclusively

ReentrantLock将同步状态用于保存锁获取操作的次数,并且还维护一个owner变量来保存当前所有者线程的标识符,只有在当前线程刚刚获取到锁,或者正要放锁的时候,才会修改这个变量。在tryRelease中检查owner域,从而确保当前线程在执行unlock操作之前已经获取到了锁;在tryAcquire中将使用这个域来区分获取操作是重入的还是竞争的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected boolean tryAcquire(int ignored) {
final Thread current = Thread.currentThread();
int c = getState();
// 无人持有锁
if (c == 0) {
// 利用原子更新尝试拿锁
if (compareAndSetState(0, 1)) {
owner = current;
return true;
}
// 已经有人持有锁,判断是否是自己重入
} else if (current == owner) {
setState(c + 1);
return true;
}
// 尝试拿锁失败
return false;
}

CountDownLatch

倒计时器

在多线程协作完成任务时,通常需要主线程等待其他线程运行完成后继续往下执行。这时通常可以使用Threadjoin方法,也可以使用CountDownLatch

先从CountDownLatch的构造方法看起

1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

其核心的内部对象就是sync,是一个继承了AQS的静态final私有类,它控制着倒计时器的同步逻辑

CountDownLatch的其他方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 阻塞等待,直到当前CountDownLatch的计数变为0
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

// 阻塞等待,直到当前CountDownLatch的计数变为0;或者等待时间超时
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

// 减小计数,如果计数为0,就释放所有调用await()阻塞的线程
public void countDown() {
sync.releaseShared(1);
}

// 返回当前计数
public long getCount() {
return sync.getCount();
}

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class CountDownLatchDemo {
private static CountDownLatch startSignal = new CountDownLatch(1);
//用来表示裁判员需要维护的是6个运动员
private static CountDownLatch endSignal = new CountDownLatch(6);

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(6);
for (int i = 0; i < 6; i++) {
executorService.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 运动员等待裁判员响哨!!!");
// 阻塞,等待主线程中的startSignal.countDown(),然后开始执行
startSignal.await();
System.out.println(Thread.currentThread().getName() + "正在全力冲刺");
// 计数--
endSignal.countDown();
System.out.println(Thread.currentThread().getName() + " 到达终点");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
System.out.println("裁判员发号施令啦!!!");
// 计数--,线程池开始同时执行
startSignal.countDown();
// 阻塞,等待所有线程中的endSignal.countDown(),计数为0后开始执行
endSignal.await();
System.out.println("所有运动员到达终点,比赛结束!");
executorService.shutdown();
}
}

CyclicBarrier

循环栅栏

CyclicBarrierCountDownLatch功能类似,都有等待计数的功能,但功能比CountDownLatch更加强大。


CyclicBarrier示意图

CyclicBarrier的主要方法如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 构造方法,指定线程均到达以后,执行barrierAction定义的操作(然后各线程再继续向下执行)
public CyclicBarrier(int parties, Runnable barrierAction)

// 等到所有的线程都到达指定的临界点
await() throws InterruptedException, BrokenBarrierException

// 与上面的await方法功能基本一致,只不过这里有超时限制,阻塞等待直至到达超时时间为止
await(long timeout, TimeUnit unit) throws InterruptedException,
BrokenBarrierException, TimeoutException

// 获取当前有多少个线程阻塞等待在临界点上
int getNumberWaiting()

// 用于查询阻塞等待的线程是否被中断
boolean isBroken()

// 将屏障重置为初始状态。如果当前有线程正在临界点等待的话,将抛出BrokenBarrierException
void reset()

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class CyclicBarrierDemo {
// 指定必须有3个运动员到达才行
private static CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("所有运动员入场,裁判员一声令下!!!!!");
});

public static void main(String[] args) {
System.out.println("运动员准备进场,全场欢呼............");
ExecutorService service = Executors.newFixedThreadPool(3);
for (int i = 0; i <= 3; i++) {
service.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 运动员,进场");
// 等指定数量的线程都到了之后再一起往下执行
barrier.await();
System.out.println(Thread.currentThread().getName() + " 运动员出发");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
}
}
}

输出结果:

1
2
3
4
5
6
7
8
运动员准备进场,全场欢呼............
pool-1-thread-2 运动员,进场
pool-1-thread-1 运动员,进场
pool-1-thread-3 运动员,进场
所有运动员入场,裁判员一声令下!!!!!
pool-1-thread-1 运动员出发
pool-1-thread-3 运动员出发
pool-1-thread-2 运动员出发

CountDownLatch与CyclicBarrier的比较

CountDownLatchCyclicBarrier都是用于控制并发的工具类,都可以理解成维护的就是一个计数器,但是这两者还是各有不同侧重点的:

  • CountDownLatch一般用于某个线程 A 等待若干个其他线程执行完任务之后,它才执行;而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
  • 调用CountDownLatchcountDown方法后,当前线程并不会阻塞,会继续往下执行;而调用CyclicBarrierawait方法,会阻塞当前线程,直到CyclicBarrier指定的线程全部都到达了指定点的时候,才能继续往下执行;
  • CountDownLatch 方法比较少,操作比较简单,而CyclicBarrier提供的方法更多,比如能够通过getNumberWaiting()isBroken()这些方法获取当前多个线程的状态,并且CyclicBarrier的构造方法可以传入barrierAction,指定当所有线程都到达时执行的业务功能;
  • CountDownLatch 是不能复用的,而CyclicBarrier是可以复用的。

Semaphore

Semaphore可以理解为信号量,用于控制资源能够被并发访问的线程数量,以保证多个线程能够合理的使用特定资源。与Lock相似但不同,Semaphore支持同时有多个线程获取到资格。

Semaphore可以用于做流量控制,特别是公共资源有限的应用场景,比如数据库连接。

Semaphore的内部也保存有通过继承AQS实现的私有静态final类,负责实现同步的逻辑

Semaphore的主要方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 获取许可,如果无法获取到,则阻塞等待直至能够获取为止
void acquire() throws InterruptedException
// 同acquire方法功能基本一样,只不过该方法可以一次获取多个许可
void acquire(int permits) throws InterruptedException
// 释放许可
void release()
// 释放指定个数的许可
void release(int permits)
// 尝试获取许可,如果能够获取成功则立即返回true,否则,则返回false
boolean tryAcquire()
// 与tryAcquire方法一致,只不过这里可以指定获取多个许可
boolean tryAcquire(int permits)
// 尝试获取许可,如果能够立即获取到或者在指定时间内能够获取到,则返回true,否则返回false
boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException
// 与上一个方法一致,只不过这里能够获取多个许可
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
// 返回当前可用的许可证个数
int availablePermits()
// 返回正在等待获取许可证的线程数
int getQueueLength()
// 是否有线程正在等待获取许可证
boolean hasQueuedThreads()
// 获取所有正在等待许可的线程集合
Collection<Thread> getQueuedThreads()

Semaphore也支持公平性和非公平性,默认是非公平性,这样吞吐量更高

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class SemaphoreDemo {

//表示老师只有10支笔
private static Semaphore semaphore = new Semaphore(5);

public static void main(String[] args) {
//表示50个学生
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
service.execute(() -> {
try {
System.out.println(Thread.currentThread().getName()
+ " 同学准备获取笔......");
// 尝试获取资源,会阻塞
semaphore.acquire();
System.out.println(Thread.currentThread().getName()
+ " 同学获取到笔");
System.out.println(Thread.currentThread().getName()
+ " 填写表格ing.....");
TimeUnit.SECONDS.sleep(3);
// 释放资源
semaphore.release();
System.out.println(Thread.currentThread().getName()
+ " 填写完表格,归还了笔!");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
service.shutdown();
}
}

Exchanger

Exchanger是一个用于线程间协作的工具类(模板类)。它提供了一个交换的同步点,在这个同步点两个线程能够交换数据。交换数据的方法就是exchange方法

主要方法如下:

1
2
3
4
5
// 当一个线程执行该方法的时候,会等待另一个线程也执行该方法,因此两个线程就都达到了同步点
// 将数据交换给另一个线程,同时返回获取的数据
V exchange(V x) throws InterruptedException
// 同上一个方法功能基本一样,只不过这个方法同步等待的时候,增加了超时时间
V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ExchangerDemo {
private static Exchanger<String> exchanger = new Exchanger();
public static void main(String[] args) {
//代表男生和女生
ExecutorService service = Executors.newFixedThreadPool(2);
service.execute(() -> {
try {
//男生对女生说的话
String girl = exchanger.exchange("我其实暗恋你很久了......");
System.out.println("女孩儿说:" + girl);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
service.execute(() -> {
try {
System.out.println("女生慢慢的从教室你走出来......");
TimeUnit.SECONDS.sleep(3);
//男生对女生说的话
String boy = exchanger.exchange("我也很喜欢你......");
System.out.println("男孩儿说:" + boy);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}

输出结果:

1
2
3
女生慢慢的从教室你走出来......
女孩儿说:我也很喜欢你......
男孩儿说:我其实暗恋你很久了......

参考

由于这部分的内容相对复杂,且需要深入到源码理解,才能比较透彻,因此参考了别人的笔记
深入理解Java线程池:ThreadPoolExecutor
Github: Java-concurrency