第二部分 结构化并发应用程序
第六章 任务执行
6.1 在线程中执行任务
当围绕“任务执行”来设计应用程序结构时,第一步就是要找出清晰的任务边界。在理想情况下,各个任务之间是相互独立的:任务并不依赖于其他任务的状态、结果或边界效应。独立性有助于实现并发,因为如果存在足够多的处理资源,那么这些独立的任务都可以并行执行。为了在调度与负载均衡等过程中实现更高的灵活性,每项任务还应该表示应用程序的一小部分处理能力。
在正常的负载下,服务器应用程序应该同时表现出良好的吞吐量和快速的响应性。应用程序提供商希望程序支持尽可能多的用户,从而降低每个用户的服务成本,而用户则希望获得尽快的响应。而且,当负荷过载时,应用程序的性能应该是逐渐降低,而不是直接失败。要实现上述目标,应该选择清晰的任务边界以及明确的任务执行策略
6.1.1 串行地执行任务
在应用程序中可以通过多种策略来调度任务,而其中一些策略能够更好地利用潜的并发性。最简单的策略就是在单个线程中串行地执行各项任务。
在服务器应用程序中,串行处理机制通常都无法提供高吞吐率或快速响应性。也有一些例外,例如,当任务数量很少且执行时间很长时,或者当服务器只为单个用户提供服务,并且该客户每次只发出一个请求时——但大多数服务器应用程序并不是按照这种方式来工作的。
6.1.2 显式地为任务创建线程
通过为每个请求创建一个新的线程来提供服务,从而实现更高的响应性。如下所示:
public class ThreadPerTaskWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
};
new Thread(task).start();
}
}
private static void handleRequest(Socket connection) {
// request-handling logic here
}
}
ThreadPerTask WebServer 在结构上类似于前面的单线程版本一一主线程仍然不断地交替执行“接受外部连接”与“分发请求”等操作。对于每个连接,主循环都将创建一个新线程来处理请求,而不是在主循环中进行处理。由此可得出3 个主要结论:
- 任务处理过程从主线程中分离出来,使得主循环能够更快地重新等待下一个到来的连接。这使得程序在完成前面的请求之前可以接受新的请求,从而提高响应性。
- 任务可以并行处理,从而能同时服务多个请求。如果有多个处理器,或者任务由于某种原因被阻塞,例如等待I/O 完成、获取锁或者资源可用性等,程序的吞吐量将得到提高。
- 任务处理代码必须是线程安全的,因为当有多个任务时会并发地调用这段代码。
在正常负载情况下,“为每个任务分配一个线程”的方法能提升串行执行的性能。只要请求的到达速率不超出服务器的请求处理能力,那么这种方法可以同时带来更快的响应性和更高的吞吐率。
6.1.3 无限制创建线程的不足
在生产环境中,“为每个任务分配一个线程”这种方法存在一些缺陷,尤其是当需要创大量的线程时:
- 线程生命周期的开销非常高。线程的创建与销毁并不是没有代价的。根据平台的不同,实际的开销也有所不同,但线程的创建过程都会需要时间,延迟处理的请求,并且需要JVM 和操作系统提供一些辅助操作,每个请求创建一个新线程将消耗大量的计算资源。
- 资源消耗。活跃的线程会消耗系统资源,尤其是内存。如果可运行的线程数量多于可用处理器的数量,那么有些线程将闲置。大量空闲的线程会占用许多内存,给垃圾回收器带来压力,而且大量线程在竞争CPU 资源时还将产生其他的性能开销。
- 稳定性。在可创建线程的数批上存在一个限制。这个限制值将随着平台的不同而不同,并且受多个因素制约,包括JVM 的启动参数、Thread 构造函数中请求的栈大小,以及底层操作系统对线程的限制等。如果破坏了这些限制,那么很可能抛出OutOfMemoryError 异常,想从这种错误中恢复过来是非常危险的,更简单的办法是通过构造程序来避免超出这些限制。
在一定的范围内,增加线程可以提高系统的吞吐率,但如果超出了这个范围,再创建更多的线程只会降低程序的执行速度,并且如果过多地创建一个线程,那么整个应用程序将崩溃。要想避免这种危险,就应该对应用程序可以创建的线程数最进行限制,并且全面地测试应用程,从而确保在线程数量达到限制时,程序也不会耗尽资源。
6.2 Executor 框架
任务是一组逻辑工作单元,而线程则是使任务异步执行的机制。上述已经有两种通过线程来执行任务的策略:
- 把所有任务放在单个线程中串行执行:糟糕的响应性和吞吐量
- 将每个任务放在各自的线程中执行:复杂的资源管理
Executor 接口为灵活且强大的异步任务执行框架提供了基础,该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable 来表示任务。Executor 的实现还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。
// Executor 接口
public interface Executor {
void execute(Runnable command);
}
Executor 基于生产者-消费者模式,提交任务的操作相当于生产者(生成待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。如果要在程序中实现一个生产者-消费者的设计,那么最简单的方式通常就是使用Executor 。
6.2.1 示例:基于Executor 的Web 服务器
基于Executor 来构建Web 服务器
public class TaskExecutionWebServer {
private static final int NTHREADS = 100;
private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
};
exec.execute(task);
}
}
private static void handleRequest(Socket connection) {
// request-handling logic here
}
}
在 TaskExecutionWeb Server 中,通过使用Executor, 将请求处理任务的提交与任务的实际执行解耦开来,井且只需采用另一种不同的Executor 实现,就可以改变服务器的行为。改变Executor 实现或配置所带来的影响要远远小于改变任务提交方式带来的影响。通常, Executor的配置是一次性的,因此在部署阶段可以完成,而提交任务的代码却会不断地扩散到整个程序中,增加了修改的难度。
-
将上述代码修改为为每个请求创建一个新的线程来提供服务,仅需修改 Executor 的实现
public class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); }; }
-
将上述代码修改为串行,仅需修改 Executor 的实现
public class WithinThreadExecutor implements Executor { public void execute(Runnable r) { r.run(); }; }
6.2.2 执行策略
通过将任务的提交与执行解耦开来,从而无须太大的困难就可以为某种类型的任务指定和修改执行策略。在执行策略中定义了任务执行的“What 、Where 、When 、How" 等方面,包括:
- 在什么(What) 线程中执行任务?
- 任务桉照什么(What) 顺序执行(FIFO 、LIFO 、优先级) ?
- 有多少个(How Many) 任务能并发执行?
- 在队列中有多少个(How Many) 任务在等待执行?
- 如果系统由于过载而需要拒绝一个任务,那么应该选择哪一个(Which) 任务?另外,如何(How) 通知应用程序有任务被拒绝?
- 在执行一个任务之前或之后,应该进行哪些(What) 动作?
各种执行策略都是一种资源管理工具,最佳策略取决于可用的计算资源以及对服务质益的需求。通过限制并发任务的数量,可以确保应用程序不会由于资源耗尽而失败,或者由于在稀缺资源上发生竞争而严重影响性能 。通过将任务的提交与任务的执行策略分离开来,有助于在部署阶段选择与可用硬件资源最匹配的执行策略。
每当看到下面这种形式的代码时:
new Thread(runnable).start()
并且你希望获得一种史灵活的执行策略时,请考虑使用Executor 来代替Thread 。
6.2.3 线程池
线程池,从字面含义来看,是指管理一组同构工作线程的资源池。线程池是与工作队列(Work Queue) 密切相关的,其中在工作队列中保存了所有等待执行的任务。工作者线程(Worker Thread) 的任务很简单:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。
“在线程池中执行任务”比“为每个任务分配一个线程”优势更多。通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销。另一个额外的好处是,当请求到达时,工作线程通常已经存在,因此不会由于等待创建线程而延迟任务的执行,从而提高了响应性。通过适当调整线程池的大小,可以创建足够多的线程以便使处理器保持忙碌状态,同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存或失败。
类库提供了一个灵活的线程池以及一些有用的默认配置。可以通过调用Executors 中的静态工厂方法之一来创建一个线程池:
- newFixedThreadPool:newFixedThreadPool 将创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化(如果某个线程由于发生了未预期的Exception 而结束,那么线程池会补充一个新的线程)。
- newCachedThreadPool:newCachedThreadPool 将创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
- newSingleThreadExecutor:newSingleThreadExecutor 是一个单线程的Executor, 它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来替代。newSingleThreadExecutor能确保依照任务在队列中的顺序来串行执行(例如FIFO 、LIFO 、优先级)。
- newScheduledThreadPool: newScheduledThreadPool 创建了一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于Timer。
- newFixedThreadPool 和 newCachedThreadPool 这两个工厂方法返回通用的ThreadPoolExecutor实例,这些实例可以直接用来构造专门用途的executor 。
从“为每任务分配一个线程”策略变成基于线程池的策略,将对应用程序的稳定性产生重大的影响: Web 服务器不会再在高负载情况下失败。由于服务器不会创建数万个线程来争夺有限的CPU 和内存资源,因此服务器的性能将平缓地降低。通过使用Executor, 可以实现各种调优、管理、监视、记录日志、错误报告和其他功能,如果不使用任务执行框架,那么要增加这些功能是非常困难的。
6.2.4 Executor 的生命周期
我们已经知道如何创建一个Executor, 但并没有讨论如何关闭它。Executor 的实现通常会创建线程来执行任务。但JVM 只有在所有(非守护)线程全部终止后才会退出。因此,如果无法正确地关闭Executor, 那么JVM 将无法结束。
由于Executor 以异步方式来执行任务,因此在任何时刻,之前提交任务的状态不是立即见的。有些任务可能已经完成,有些可能正在运行,而其他的任务可能在队列中等待执行。关闭应用程序时,可能采用最平缓的关闭形式(完成所有已经启动的任务,并且不再接受任何新的任务),也可能采用最粗暴的关闭形式(直接关掉机房的电源),以及其他各种可能的形式。既然Executor 是为应用程序提供服务的,因而它们也是可关闭的(无论采用平缓的方式还是粗暴的方式),并将在关闭操作中受影响的任务的状态反馈给应用程序。
为了解决执行服务的生命周期问题 , Executor 扩展了 ExecutorService 接口 , 添加了一些用于生命周期管理的方法 ( 同时还有一些用于任务提交的便利方法 ) 。
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService 的生命周期有3 种状态:运行、关闭和已终止。ExecutorService 在初始创建时处于运行状态。shutdown 方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成一包括那些还未开始执行的任务。shutdownNow 方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。
在ExecutorService 关闭后提交的任务将由“拒绝执行处理器(Rejected Execution Handler)"来处理,它会抛弃任务,或者使得execute 方法抛出一个未检查的 RejectedExecutionException。等所有任务都完成后, ExecutorService 将转人终止状态。可以调用awaitTermination 来等待ExecutorService 到达终止状态,或者通过调用isTerminated 来轮询ExecutorService 是否已经终止。通常在调用awaitTermination 之后会立即调用shutdown, 从而产生同步地关闭ExecutorService 的效果。
6.2.5 延迟任务与周期任务
Timer 类负责管理延迟任务(“在lOOms 后执行该任务”)以及周期任务(“每IOms 执行一次该任务”)。然而, Timer 存在一些缺陷,因此应该考虑使用ScheduledThreadPoolExecutor 来代替它包可以通过ScheduledThreadPoolExecutor 的构造函数或newScheduledThreadPool 工厂方法来创建该类的对象。
Timer 在执行所有定时任务时只会创建一个线程。如果某个任务的执行时间过长,那么将破坏其他TimerTask 的定时精确性。例如某个周期TimerTask 需要每IOms 执行一次,而另一个TimerTask 需要执行40ms, 那么这个周期任务或者在40ms 任务执行完成后快速连续地调用4次,或者彻底“丢失”4 次调用。线程池能弥补这个缺陷,它可以提供多个线程来执行延时任务和周期任务。
Timer 的另一个问题是,如果TimerTask 抛出了一个未检查的异常,那么Timer 将表现出糟糕的行为。Timer 线程并不捕获异常,因此当TimerTask 抛出未检查的异常时将终止定时线程。这种情况下, Timer 也不会恢复线程的执行,而是会错误地认为整个Timer 都被取消了。因此,已经被调度但尚未执行的TimerTask 将不会再执行,新的任务也不能被调度。又叫“线程泄露”。
// 错误的Timer 行为
public class OutOfTime {
public static void main(String[] args) throws Exception {
Timer timer = new Timer();
timer.schedule(new ThrowTask(), 1);
SECONDS.sleep(1);
timer.schedule(new ThrowTask(), 1);
SECONDS.sleep(5);
}
static class ThrowTask extends TimerTask {
public void run() {
throw new RuntimeException();
}
}
}
你可能认为程序会运行6 秒后退出,但实际际情况是运行1 秒就结束了,并抛出了一个异常消息”Timer already cancelled” 。ScheduledThreadPoolExecutor 能正确处理这些表现出错误行为的任务。在Java 5.0 或更高的JDK 中,将很少使用Timer 。
如果要构建自己的调度服务,那么可以使用DelayQueue, 它实现了BlockingQueue, 并为ScheduledThreadPoolExecutor 提供调度功能。DelayQueue 管理着一组Delayed 对象。每个Delayed 对象都有一个相应的延迟时间:在DelayQueue 中,只有某个元素逾期后,才能从DelayQueue 中执行take 操作。从DelayQueue 中返回的对象将根据它们的延迟时间进行排序。
6.3 找出可利用的并行性
Executor 框架帮助指定执行策略,但如果要使用Executor, 必须将任务表述为一个Runnable 。在大多数服务器应用程序中都存在一个明显的任务边界:单个客户请求。
6.3.1 示例:串行的页面渲染器
最简单的方法就是对HTML 文档进行串行处理。当遇到文本标签时,将其绘制到图像缓存中。当遇到图像引用时,先通过网络获取它,然后再将其绘制到图像缓存中。
另一种串行执行方法更好一些,它先绘制文本元素,同时为图像预留出矩形的占位空间,在处理完了第一遍文本后,程序再开始下载图像,并将它们绘制到相应的占位空间中。
// 串行地渲染页面元素
public abstract class SingleThreadRenderer {
void renderPage(CharSequence source) {
renderText(source);
List<ImageData> imageData = new ArrayList<ImageData>();
for (ImageInfo imageInfo : scanForImageInfo(source))
imageData.add(imageInfo.downloadImage());
for (ImageData data : imageData)
renderImage(data);
}
interface ImageData {
}
interface ImageInfo {
ImageData downloadImage();
}
abstract void renderText(CharSequence s);
abstract List<ImageInfo> scanForImageInfo(CharSequence s);
abstract void renderImage(ImageData i);
}
图像下载过程的大部分时间都是在等待IIO 操作执行完成,在这期间CPU 几乎不做任何工作。因此,这种串行执行方法没有充分地利用CPU, 使得用户在看到最终页面之前要等待过长的时间。通过将问题分解为多个独立的任务并发执行,能够获得更高的CPU 利用率和响应灵敏度。
6.3.2 携带结果的任务Callable 与Future
Executor 框架使用Runnable 作为其基本的任务表示形式。Runnable 是一种有很大局限的抽象,虽然run 能写入到日志文件或者将结果放入某个共享的数据结构,但它不能返回一个值或抛出一个受检查的异常。
许多任务实际上都是存在延迟的计算一一执行数据库查询,从网络上获取资源,或者计算某个复杂的功能。对于这些任务, Callable 是一种更好的抽象:它认为主入口点(即call) 将返回一个值,井可能抛出一个异常。e在Executor 中包含了一些辅助方法能将其他类型的任务封装为一个Callable,如Runnable 和 Java.security.PrivilegedAction。
Runnable 和Callable 描述的都是抽象的计算任务。这些任务通常是有范围的,即都有一个明确的起始点,并且最终会结束。Executor 执行的任务有4 个生命周期阶段:创建、交、开始和完成。由于有些任务可能要执行很长的时间,因此通常希望能够取消这些任务。在Executor 框架中,已提交但尚未开始的任务可以取消,但对于那些已经开始执行的任务,只有当它们能响应中断时,才能取消。取消一个已经完成的任务不会有任何影响。
Future 表示一个任务的生命周期,井提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。在Future 规范中包含的隐含意义是,任务的生命周期只能前进,不能后退,就像ExecutorService 的生命周期一样。当某个任务完成后,它就永远停留在“完成“状态上。
public interface Callable<V>{
V call() throws Exception;
}
public interface Future<V>{
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException, CancellationException;
V get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, CancellationException, TimeoutException;
}
get 方法的行为取决于任务的状态(尚未开始、正在运行、已完成)。如果任务已经完成,那么get 会立即返回或者抛出一个Exception, 如果任务没有完成,那么get 将阻塞并直到任务完成。如果任务抛出了异常,那么get 将该异常封装为ExecutionException 并重新抛出。如果任务被取消,那么get 将抛出CancellationException。如果get 抛出了ExecutionException, 那么可以通过getCause 来获得被封装的初始异常。
可以通过许多种方法创建一个Future 来描述任务。ExecutorService 中的所有submit 方法都将返回一个Future, 从而将一个Runnable 或Callable 提交给Executor,并得到一个Future 用来获得任务的执行结果或者取消任务。还可以显式地为某个指定的Runnable 或Callable 实例化一个FutureTask 。(由于FutureTask 实现了Runnable, 因此可以将它提交给Executor 来执行,或者直接调用它的run 方法。)
从Java 6 开始, ExecutorService 实现可以改写AbstractExecutorService 中的newTaskFor方法,从而根据已提交的Runnable 或Callable 来控制Future 的实例化过程。在默认实现中仅创建了一个新的FutureTask
// ThreadPoolExecutor 中newTaskFor 的默认实现
protected <T> RunnableFuture<T> newTaskFor(Callable<T> task){
return new FutureTask<T>(task);
}
在将Runnable 或Callable 提交到Executor 的过程中,包含了一个安全发布过程,即将Runnable 或Callable 从提交线程发布到最终执行任务的线程。类似地,在设置Future 结果的过程中也包含了一个安全发布,即将这个结果从计算它的线程发布到任何通过get获得它的线程。
6.3.3 示例:使用Future 实现页面渲染器
为了使页面渲染器实现更高的并发性,首先将渲染过程分解为两个任务,一个是渲染所有的文本,另一个是下载所有的图像。(因为其中一个任务是CPU 密集型,而另一个任务 I/O,密集型,因此这种方法即使在单CPU 系统上也能提升性能。)
// 使用Future 等待图像下载
public abstract class FutureRenderer {
private final ExecutorService executor = Executors.newCachedThreadPool();
void renderPage(CharSequence source) {
final List<ImageInfo> imageInfos = scanForImageInfo(source);
Callable<List<ImageData>> task =
new Callable<List<ImageData>>() {
public List<ImageData> call() {
List<ImageData> result = new ArrayList<ImageData>();
for (ImageInfo imageInfo : imageInfos)
result.add(imageInfo.downloadImage());
return result;
}
};
Future<List<ImageData>> future = executor.submit(task);
renderText(source);
try {
List<ImageData> imageData = future.get();
for (ImageData data : imageData)
renderImage(data);
} catch (InterruptedException e) {
// 重新设置线程的中断状态
Thread.currentThread().interrupt();
// 由于不需要结果,因此取消任务
future.cancel(true);
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
interface ImageData {}
interface ImageInfo {
ImageData downloadImage();
}
abstract void renderText(CharSequence s);
abstract List<ImageInfo> scanForImageInfo(CharSequence s);
abstract void renderImage(ImageData i);
}
在程序FutureRenderer中创建了一个Callable 来下载所有的图像,并将其提交到一个ExecutorService 。这将返回一个描述任务执行情况的Future 。当主任务需要图像时,它会等待Future.get 的调用结果。如果幸运的话,当开始请求时所有图像就已经下载完成了,即使没有,至少图像的下载任务也已经提前开始了。
get 方法拥有“状态依赖"的内在特性,因而调用者不需要知道任务的状态,此外在任务提交和获得结果中包含的安全发布属性也确保了这个方法是线程安全的。Future.get 的异常处理代码将处理两个可能的问题:任务遇到一个Exception, 或者调用get 的线程在获得结果之前被中断。
6.3.4 在异构任务并行化中存在的局限
通过对异构任务进行并行化来获得重大的性能提升是很困难的。
两个人可以很好地分担洗碗的工作。然而,要将不同类型的任务平均分配给每个工人却并不容易。当人数增加时,如何确保他们能帮忙而不是妨碍其他人工作,或者在重新分配工作时,并不是容易的事情。如果没有在相似的任务之间找出细粒度的并行性,那么这种方法带来的好处将减少。
当在多个工人之间分配异构的任务时,还有一个问题就是各个任务的大小可能完全不同。最后,当在多个工人之间分解任务时,还需要一定的任务协调开销:为了使任务分解能提高性能,这种开销不能高于并行性实现的提升。
只有当大量相互独立且同构的任务可以并发进行处理时,才能体现出将程序的工作负载分配到多个任务中带来的真正性能提升。
6.3.5 CompletionService:Executon 与BlockingQueue
如果向Executor 提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future, 然后反复使用get 方法,同时将参数timeout 指定为o, 从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务(CompletionService) 。
CompletionService 将Executor 和 BlockingQueue 的功能融合在一起。你可以将Callable 任务提交给它来执行,然后使用类似于队列操作的take 和poll 等方法来获得已完成的结果,而这些结果会在完成时将被封装为Future 。ExecutorCompletionService 实现了CompletionService,并将计算部分委托给一个Executor 。
ExecutorCompletionService 的实现非常简单。在构造函数中创建一个BlockingQueue 来保存计算完成的结果。当计算完成时,调用FutureTask 中的done 方法。当提交某个任务时,该任务将首先包装为一个QueueingFuture, 这是FutureTask 的一个子类,然后再改写子类的done方法,并将结果放入BlockingQueue 中,如下所示。take 和poll 方法委托给了BlockingQueue, 这些方法会在得出结果之前阻塞。
// 由ExecutorCompletionService 使用的 QueueingFuture 类
private class QueueingFuture<V> extends FutureTask<V> {
QueueingFuture(Callable<V> c) {
super(c);
}
QueueingFuture(Runnable t, V r) {
super(t, r);
}
protected void done() {
completionQueue.add(this);
}
}
6.3.6 示例:使用CompletionService 实现页面渲染器
可以通过CompletionService 从两个方面来提高页面渲染器的性能:缩短总运行时间以及提高响应性。
- 为每一幅图像的下载都创建一个独立任务,并在线程池中执行它们,从而将串行的下载过程转换为并行的过程:这将减少下载所有图像的总时间。
- 通过从CompletionService 中获取结果以及使每张图片在下载完成后立刻显示出来,能使用户获得一个更加动态和更高响应性的用户界面。
// 使用Completion Service, 使页面元素在下载完成后立即显示出来
public abstract class Renderer {
private final ExecutorService executor;
Renderer(ExecutorService executor) {
this.executor = executor;
}
void renderPage(CharSequence source) {
final List<ImageInfo> info = scanForImageInfo(source);
CompletionService<ImageData> completionService =
new ExecutorCompletionService<ImageData>(executor);
for (final ImageInfo imageInfo : info)
completionService.submit(new Callable<ImageData>() {
public ImageData call() {
return imageInfo.downloadImage();
}
});
renderText(source);
try {
for (int t = 0, n = info.size(); t < n; t++) {
Future<ImageData> f = completionService.take();
ImageData imageData = f.get();
renderImage(imageData);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
interface ImageData {}
interface ImageInfo {
ImageData downloadImage();
}
abstract void renderText(CharSequence s);
abstract List<ImageInfo> scanForImageInfo(CharSequence s);
abstract void renderImage(ImageData i);
}
多个ExecutorCompletionService 可以共享一个Executor, 因此可以创建一个对于特定计算私有,又能共享一个公共Executor 的ExecutorCompletionService 。因此, CompletionService 的作用就相当于一组计算的句柄,这与Future 作为单个计算的旬柄是非常类似的。通过记录提交给CompletionService 的任务数量,并计算出已经获得的已完成结果的数量,即使使用一个共享的Executor, 也能知道已经获得了所有任务结果的时间。
6.3.7 为任务设置时限
有时候,如果某个任务无法在指定时间内完成,那么将不再需要它的结果,此时可以放弃这个任务。
在有限时间内执行任务的主要困难在于,要确保得到答案的时间不会超过限定的时间,或者在限定的时间内无法获得答案。在支持时间限制的 Future.get 中支持这种需求:当结果可用时,它将立即返回,如果在指定时限内没有计算出结果,那么将抛出 TimeoutException 。
在使用限时任务时需要注意,当这些任务超时后应该立即停止,从而避免为继续计算一个不再使用的结果而浪费计算资源。要实现这个功能,可以由任务本身来管理它的限定时间,并且在超时后中止执行或取消任务。此时可再次使用Future, 如果一个限时的get 方法抛出了TimeoutException, 那么可以通过Future 来取消任务。如果编写的任务是可取消的,那么可以提前中止它,以免消耗过多的资源。
public class RenderWithTimeBudget {
private static final Ad DEFAULT_AD = new Ad();
private static final long TIME_BUDGET = 1000;
private static final ExecutorService exec = Executors.newCachedThreadPool();
Page renderPageWithAd() throws InterruptedException {
long endNanos = System.nanoTime() + TIME_BUDGET;
Future<Ad> f = exec.submit(new FetchAdTask());
// 在等持广告的同时显示页面
Page page = renderPageBody();
Ad ad;
try {
// 只等持指定的时间长度
long timeLeft = endNanos - System.nanoTime();
ad = f.get(timeLeft, NANOSECONDS);
} catch (ExecutionException e) {
ad = DEFAULT_AD;
} catch (TimeoutException e) {
ad = DEFAULT_AD;
f.cancel(true);
}
page.setAd(ad);
return page;
}
Page renderPageBody() { return new Page(); }
static class Ad {}
static class Page {
public void setAd(Ad ad) { }
}
static class FetchAdTask implements Callable<Ad> {
public Ad call() {
return new Ad();
}
}
}
限时Future.get 的一种典型应用。在它生成的页面中包括响应用求的内容以及从广告服务器上获得的广告。它将获取广告的任务提交给一个Executor, 然后计算剩余的文本页面内容,最后等待广告信息,直到超出指定的时间e 。如果get 超时,那么将取消广告获取任务,并转而使用默认的广告信息。
6.3.8 示例:旅行预定门户网站
“预定时间”方法可以很容易地扩展到任意数量的任务上。考虑这样一个旅行预定门户网站:用户输入旅行的日期和其他要求,门户网站获取并显示来自多条航线、旅店或汽车租赁公司的报价。在获取不同公司报价的过程中,可能会调用Web 服务、访问数据库、执行一个EDI事务或其他机制。在这种情况下,不宜让页面的响应时间受限于最慢的响应时间,而应该只显示在指定时间内收到的信息。对于没有及时响应的服务提供者,页面可以忽略它们,或者显一个提示信息,例如“Did not hear from Air Java in time。”
从一个公司获得报价的过程与从其他公司获得报价的过程无关,因此可以将获取报价的过程当成一个任务,从而使获得报价的过程能并发执行。创建n 个任务,将其提交到一个线程池,保留n 个Future, 并使用限时的get 方法通过Future 串行地获取每一个结果,这一切都很简单,但还有一个更简单的方法一一invokeAll 。
程序中使用了支持限时的invokeAll, 将多个任务提交到一个ExecutorService 并获得结果。InvokeAll 方法的参数为一组任务,并返回一组Future 。这两个集合有着相同的结构。invokeAll 按照任务集合中迭代器的顺序将所有的Future 添加到返回的集合中,从而使调用者能将各个Future 与其表示的Callable 关联起来。当所有任务都执行完毕时,或者调用线程被中断时,又或者超过指定时限时, invokeAll 将返回。当超过指定时限后,任何还未完成的任务都会取消。当invokeAll 返回后,每个任务要么正常地完成,要么被取消,而客户端代码可以调用get 或 isCancelled 来判断究竟是何种情况。
// 在预定时间内请求旅游报价
public class TimeBudget {
private static ExecutorService exec = Executors.newCachedThreadPool();
public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo, Set<TravelCompany> companies,
Comparator<TravelQuote> ranking, long time, TimeUnit unit)
throws InterruptedException {
List<QuoteTask> tasks = new ArrayList<QuoteTask>();
for (TravelCompany company : companies)
tasks.add(new QuoteTask(company, travelInfo));
List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit);
List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size());
Iterator<QuoteTask> taskIter = tasks.iterator();
for (Future<TravelQuote> f : futures) {
QuoteTask task = taskIter.next();
try {
quotes.add(f.get());
} catch (ExecutionException e) {
quotes.add(task.getFailureQuote(e.getCause()));
} catch (CancellationException e) {
quotes.add(task.getTimeoutQuote(e));
}
}
Collections.sort(quotes, ranking);
return quotes;
}
}
class QuoteTask implements Callable<TravelQuote> {
private final TravelCompany company;
private final TravelInfo travelInfo;
public QuoteTask(TravelCompany company, TravelInfo travelInfo) {
this.company = company;
this.travelInfo = travelInfo;
}
TravelQuote getFailureQuote(Throwable t) {
return null;
}
TravelQuote getTimeoutQuote(CancellationException e) {
return null;
}
public TravelQuote call() throws Exception {
return company.solicitQuote(travelInfo);
}
}
interface TravelCompany {
TravelQuote solicitQuote(TravelInfo travelInfo) throws Exception;
}
interface TravelQuote {}
interface TravelInfo {}
小结
通过围绕任务执行来设计应用程序,可以简化开发过程,并有助于实现并发。Executor 框架将任务提交与执行策略解耦开来,同时还支持多种不同类型的执行策略。当需要创建线程来执行任务时,可以考虑使用Executor。要想在将应用程序分胪为不同的任务时获得最大的好处,必须定义清晰的任务边界。某些应用程序中存在着比较明显的任务边界,而在其他一些程序中则需要进一步分析才能揭示出粒度更细的并行性。
第七章 取消与关闭
任务和线程的启动很容易。在大多数时候,我们都会让它们运行直到结束,或者让它们自行停止。然而,有时候我们希望提前结束任务或线程,或许是因为用户取消了操作,或者应用程序需要被快速关闭。
要使任务和线程能安全、快速、可靠地停止下来,井不是一件容易的事。Java 没有提供任何机制来安全地终止线程 。但它提供了中断(Interruption) ,这是一种协作机制,能够使一个线程终止另一个线程的当前工作。
7.1 任务取消
如果外部代码能在某个操作正常完成之前将其置入“完成“状态,那么这个操作就可以称为可取消的(Cancellable) 。取消某个操作的原因很多:
用户请求取消。用户点击图形界面程序中的“取消”按钮,或者通过管理接口来发出取消请求,例如JMX (Java Management Extensions) 。
有时间限制的操作。例如,某个应用程序需要在有限时间内搜索问题空间,并在这个时间内选择最佳的解决方案。当计时器超时时,需要取消所有正在搜索的任务。
应用程序事件。例如,应用程序对某个问题空间进行分解井搜索,从而使不同的任务可以搜索问题空间中的不同区域。当其中一个任务找到了解决方案时,所有其他仍在搜索的任务都
错误。网页爬虫程序搜索相关的页面,并将页面或摘要数据保存到硬盘。当一个爬虫任务发生错误时(例如,磁盘空间已满),那么所有搜索任务都会取消,此时可能会记录它们的前状态,以便稍后重新启动。
关闭。当一个程序或服务关闭时,必须对正在处理和等待处理的工作执行某种操作。在平缓的关闭过程中,当前正在执行的任务将继续执行直到完成,而在立即关闭过程中,当前的任务则可能取消。
在Java 中没有一种安全的抢占式方法来停止线程,因此也就没有安全的抢占式方法来停止任务。只有一些协作式的机制,使请求取消的任务和代码都遵循一种协商好的协议。
其中一种协作机制能设置某个“已请求取消(Cancellation Requested)" 标志,而任务将定期地查看该标志。如果设置了这个标志,那么任务将提前结束。
一个可取消的任务必须拥有取消策略(Cancellation Policy) ,在这个策略中将详细地定义取消操作的“How” 、“When" 以及“What",即其他代码如何(How) 请求取消该任务,任务在何时(When) 检查是否已经请求了取消,以及在响应取消请求时应该执行哪些(What) 操作。
7.1.1 中断
下方程序中,生产者线程生成素数,并将它们放入一个阻塞队列。如果生产者的速度超过了消费者的处理速度,队列将被填满, put方法也会阻塞。当生产者在put 方法中阻塞时,如果消费者希望取消生产者任务,它可以调用cancel 方法来设置cancelled 标志,但此时生产者却永远不能检查这个标志,因为它无法从阻塞的put 方法中恢复过来(因为消费者此时已经停止从队列中取出素数,所以put 方法将一直保持阻塞状态)。
// BAD
class BrokenPrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
private volatile boolean cancelled = false;
BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!cancelled)
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
}
}
public void cancel() {
cancelled = true;
}
}
线程中断是一种协作机制,线程可以通过这种机制来通知另一个线程,告诉它在合适的或者可能的情况下停止当前工作,并转而执行其他的工作。
在 Java 的 API 或语言規范中 , 并没有将中断与任何取消语义关联起来 , 但实际上 , 如果在取消之外的其他操作中使用中断 , 哪么都是不合适的 , 并且很难支撑起更大的应用 。
每个线程都有一个boolean 类型的中断状态。当中断线程时,这个线程的中断状态将被设置为true 。在Thread 中包含了中断线程以及查询线程中断状态的方法。interrupt 方法能中断目标线程,而isInterrupted 方法能返回目标线程的中断状态。静态的interrupted 方法将清除当前线程的中断状态,并返回它之前的值,这也是清除中断状态的唯一方法。
阻塞库方法,例如Thread.sleep 和Object.wait 等,都会检查线程何时中断,井且在发现中断时提前返回。它们在响应中断时执行的操作包括:清除中断状态,抛出InterruptedException,表示阻塞操作由于中断而提前结束。JVM 井不能保证阻塞方法检测到中断的速度,但在实际情况中响应速度还是非常快的。
当线程在非阻塞状态下中断时,它的中断状态将被设置,然后根据将被取消的操作来检查中断状态以判断发生了中断。通过这样的方法,中断操作将变得“有黏性”一如果不触 InterruptedExcepti on, 那么中断状态将一直保持,直到明确地清除中断状态。
调用 interrupt 并不意味着立即停止目标线程正在进行的工作 , 而只是传递 了请求中断的消息
中断它并不会真正地中断一个正在运行的线程,而只是发出中断请求,然后由线程在下一个合适的时刻中断自己。(这些时刻也被称为取消点)。有些方法,例如wait 、sleep 和join 等,将严格地处理这种请求,当它们收到中断请求或者在开始执行时发现某个已被设置好的中断状态时,将抛出一个异常。设计良好的方法可以完全忽略这种请求,只要它们能使调用代码对中断请求进行某种处理。设计糟糕的方法可能会屏蔽中断请求,从而导致调用栈中的其他代码无法对中断请求作出响应。
在使用静态的interrupted 时应该小心,因为它会清除当前线程的中断状态。如果在调用interrupted 时返回了true, 那么除非你想屏蔽这个中断,否则必须对它进行处理—可以抛出lnterruptedException, 或者通过再次调用interrupt 来恢复中断状态
通常,中断是实现取消的最合理方式 。
BrokenPrimeProducer 中的问题很容易解决(和简化) :使用中断而不是boolean 标志来请求取消。在每次迭代循环中,有两个位置可以检测出中断:在阻塞的put方法调用中,以及在循环开始处查询中断状态时。由于调用了阻塞的put 方法,因此这里并不一定需要进行显式的检测,但执行检测却会使PrimeProducer 对中断具有更高的响应性,因为它是在启动寻找素数任务之前检查中断的,而不是在任务完成之后。如果可中断的阻塞方法的调用频率并不高,不足以获得足够的响应性,那么显式地检测中断状态能起到一定的帮助作用。
public class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
/* Allow thread to exit */
}
}
public void cancel() {
interrupt();
}
}
7.1.2 中断策略
中断策略规定线程如何解释某个中断请求——当发现中断请求时,应该做哪些工作(如果需要的话),对于中断来说是原子操作,以及以多快的速度来响应中断。
最合理的中断策略是某种形式的线程级(Thread-Level) 取消操作或服务级(Service-Level)取消操作:尽快退出,在必要时进行清理,通知某个所有者该线程已经退出。其他的中断策略,例如暂停服务或重新开始服务,但对于那些包含非标准中断策略的线程或线程池,只能用于能知道这些策略的任务中。
区分任务和线程对中断的反应是很重要的。一个中断请求可以有一个或多个接收者–中断线程池中的某个工作者线程,同时意味着“取消当前任务”和“关闭工作者线程”。
任务不会在其自己拥有的线程中执行,而是在某个服务(例如线程池)拥有的线程中执行。对于非线程所有者的代码来说(例如,对于线程池而言,任何在线程池实现以外的代码),应该小心地保存中断状态,这样拥有线程的代码才能对中断做出响应,即使“非所有者”代码也可以做出响应。
这就是为什么大多数可阻塞的库函数都只是抛出InterruptedException 作为中断响应。它们永远不会在某个由自己拥有的线程中运行,因此它们为任务或库代码实现了最合理的取消策略:尽快退出执行流程,并把中断信息传递给调用者,从而使调用栈中的上层代码可以采取进一步的操作。
当检查到中断请求时,任务并不需要放弃所有的操作——它可以推迟处理中断请求,并直到某个更合适的时刻。因此需要记住中断请求,并在完成当前任务后抛出InterruptedException 或者表示已收到中断请求。这项技术能够确保在更新过程中发生中断时,数据结构不会被破坏。
任务不应该对执行该任务的线程的中断策略做出任何假设,除非该任务被专门设计为在服务中运行,并且在这些服务中包含特定的中断策略。无论任务把中断视为取消,还是其他某个中断响应操作,都应该小心地保存执行线程的中断状态。如果除了将InterruptedException 传递给调用者外还需要执行其他操作,那么应该在捕获InterruptedException 之后恢复中断状态:
Thread.currentThread().interrupt();
正如任务代码不应该对其执行所在的线程的中断策略做出假设,执行取消操作的代码也不应该对线程的中断策略做出假设。线程应该只能由其所有者中断,所有者可以将线程的中断策略信息封装到某个合适的取消机制中,例如关闭(shutdown) 方法。
由于每个线程拥有各自的中断策略,因此除你知道中断对该线程的含义,否则就不应该中断这个线程 。
批评者曾嘲笑Java 的中断功能,因为它没有提供抢占式中断机制,而且还强迫开发人员必须处理InterruptedException。然而,通过推迟中断请求的处理,开发人员能制定更灵活的中断策略,从而使应用程序在响应性和健壮性之间实现合理的平衡。
7.1.3 响应中断
当调用可中断的阻塞函数时,例如Thread.sleep 或BlockingQueue.put 等,有两种实用策略可用于处理InterruptedException:
- 传递异常(可能在执行某个特定于任务的清除操作之后),从而使你的方法也成为可中断的阻塞方法。
- 恢复中断状态,从而使调用栈中的上层代码能够对其进行处理。
如果不想或无法传递InterruptedException (或许通过Runnable 来定义任务)。一种标准的方法就是通过再次调用 interrupt 来恢复中断状态。你不能屏蔽InterruptedException, 例如在catch 块中捕获到异常却不做任何处理,除非在你的代码中实现了线程的中断策略。虽然 PrimeProducer 屏蔽了中断,但这是因为它已经知道线程将要结束,因此在调用栈中已经没有上层代码需要知道中断信息。由于大多数代码并不知道它们将在哪个线程中运行,因此应该保存中断状态。
只有实现了线程中断策略的代码才可以屏蔽中断请求,在常规的任务和库代码中都不应该屏蔽中断请求
对于一些不支持取消但仍可以调用可中断阻塞方法的操作,它们必须在循环中调用这些方法,并在发现中断后重新尝试。在这种情况下,它们应该在本地保存中断状态,并在返回前恢复状态而不是在捕获InterruptedException 时恢复状态,如下程序所示。如果过早地设置中断状态,就可能引起无限循环,因为大多数可中断的阻塞方法都会在入口处检查中断状态,并且当发现该状态已被设置时会立即抛出lnterruptedException。(通常,可中断的方法会在阻塞或进行重要的工作前首先检查中断,从而尽快地响应中断)。
public class NoncancelableTask {
public Task getNextTask(BlockingQueue<Task> queue) {
boolean interrupted = false;
try {
while (true) {
try {
return queue.take();
} catch (InterruptedException e) {
interrupted = true;
// fall through and retry
}
}
} finally {
if (interrupted)
Thread.currentThread().interrupt();
}
}
interface Task {
}
}
如果代码不会调用可中断的阻塞方法,那么仍然可以通过在任务代码中轮询当前线程的中断状态来响应中断。要选择合适的轮询频率,就需要在效率和响应性之间进行权衡。如果响应性要求较高,那么不应该调用那些执行时间较长井且不响应中断的方法,从而对可调用的库代码进行一些限制。
在取消过程中可能涉及除了中断状态之外的其他状态。中断可以用来获得线程的注意,并且由中断线程保存的信息,可以为中断的线程提供进一步的指示。(当访问这些信息时,要确保使用同步。)
7.1.4 示例:计时运行
许多问题永远也无法解决(例如,枚举所有的素数),而某些问题,能很快得到答案,也可能永远得不到答案。在这些情况下,如果能够指定“最多花10 分钟搜索答案”或者“枚举出在10 分钟内能找到的答案”,那么将是非常有用的。
public class TimedRun1 {
private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(1);
public static void timedRun(Runnable r, long timeout, TimeUnit unit) {
final Thread taskThread = Thread.currentThread();
cancelExec.schedule(new Runnable() {
public void run() {
taskThread.interrupt();
}
}, timeout, unit);
r.run();
}
}
程序TimedRun2解决了aSecondOfPrimes 的异常处理问题以及之前解决方案中的问题。执行任务的线程拥有自己的执行策略,即使任务不响应中断,限时运行的方法仍能返回到它的调用者。在启动任务线程之后, timedRun 将执行一个限时的join 方法。在join 返回后,它将检查任务中是否有异常抛出,如果有的话,则会在调用timedRun 的线程中再次抛出该异常。由于Throwable 将在两个线程之间共享,因此该变最被声明为volatile 类型,从而确保安全地将其从任务线程发布到timedRun 线程。
public class TimedRun2 {
private static final ScheduledExecutorService cancelExec = newScheduledThreadPool(1);
public static void timedRun(final Runnable r, long timeout, TimeUnit unit)
throws InterruptedException {
class RethrowableTask implements Runnable {
private volatile Throwable t;
public void run() {
try {
r.run();
} catch (Throwable t) {
this.t = t;
}
}
void rethrow() {
if (t != null)
throw launderThrowable(t);
}
}
RethrowableTask task = new RethrowableTask();
final Thread taskThread = new Thread(task);
taskThread.start();
cancelExec.schedule(new Runnable() {
public void run() {
taskThread.interrupt();
}
}, timeout, unit);
// join 阻塞当前线程的执行,等到被调用join的线程对象执行完毕才执行继续执行当前线程
taskThread.join(unit.toMillis(timeout));
task.rethrow();
}
}
7.1.5 通过Future 来实现取消
我们已经使用了一种抽象机制来管理任务的生命周期,处理异常,以及实现取消,即Future 。
ExecutorService.submit 将返回一个Future 来描述任务。Future 拥有一个cancel 方法,该方法带需要一个boolean 类型的参数 maylnterruptIfRunning, 表示取消操作是否成功。(这只是表示任务是否能够接收中断,而不是表示任务是否能检测并处理中断。)如果 maylnterruptIfRunning 为true 并且任务当前正在某个线程中运行,那么这个线程能被中断。如果这个参数为false, 那么意味着“若任务还没有启动,就不要运行它",这种方式应该用于那些不处理中断的任务中。
mayInterruptIfRunning
为true,该task
正在执行,这个task
应该被中断;mayInterruptIfRunning
为false,该task
正在执行,这个task
将会继续执行到完成,未运行则直接取消。
执行任务的线程是由标准的Executor 创建的,它实现了一种中断策略使得任务可以通过中断被取消,所以如果任务在标准Executor 中运行,井通过它们的Future 来取消任务,那么可以设置 mayInterruptIfRunning 。当尝试取消某个任务时,不宜直接中断线程池,因为你井不知道当中断请求到达时正在运行什么任务一一只能通过任务的Future 来实现取消。这也是在编写任务时要将中断视为一个取消请求的另一个理由:可以通过任务的Future 来取消它们。
另一个版本的timedRun: 将任务提交给一个ExecutorService, 井通过一个定时的Future.get 来获得结果。如果get 在返回时抛出了一个TimeoutException, 那么任务将通过它的Future 来取消。(为了简化代码,这个版本的timedRun 在finally 块中将直接调用Future.cancel, 因为取消一个已完成的任务不会带来任何影响。)如果任务在被取消前就抛出一个异常,那么该异常将被重新抛出以便由调用者来处理异常。在程序中还给出了另种良好的编程习惯:取消那些不再需要结果的任务。
public class TimedRun {
private static final ExecutorService taskExec = Executors.newCachedThreadPool();
public static void timedRun(Runnable r, long timeout, TimeUnit unit)
throws InterruptedException {
Future<?> task = taskExec.submit(r);
try {
task.get(timeout, unit);
} catch (TimeoutException e) {
// task will be cancelled below
} catch (ExecutionException e) {
// exception thrown in task; rethrow
throw launderThrowable(e.getCause());
} finally {
// Harmless if task already completed
task.cancel(true); // interrupt if running
}
}
}
当Future.get抛出InterruptedException或TimeoutException时,如果你知道不再需要结果,那么就可以调用Future.cancel 来取消任务。
7.1.6 处理不可中断的阻塞
Java 库中,许多可阻塞的方法都是通过提前返回或者抛出InterruptedException 来响应中断请求的,从而使开发人员更容易构建出能响应取消请求的任务。
并非所有的可阻塞方法或者阻塞机制都能响应中断;如果一个线程由于执行同步的Socket I/O 或者等待获得内锁而阻塞,那么中断请求只能设置线程的中断状态,除此之外没有其他任何作用。对于那些由于执行不可中断操作而被阻塞的线程,可以使用类似于中断的手段来停止这些线程,但这要求我们必须知道线程阻塞的原因。
Java.io 包中的同步Socket I/O 。在服务器应用程序中,最常见的阻塞 I/O 形式就是对套接字进行读取和写人。虽然InputStream 和OutputStream 中的read 和write 等方法都不会响应中断,但通过关闭底层的套接字,可以使得由于执行read 或write 等方法而被阻塞的线程抛出一个 SocketException 。
Java.io 包中的同步 I/O 。当中断一个正在 InterruptibleChannel 上等待的线程肘,将抛出ClosedByinterruptException 并关闭链路(这还会使得其他在这条链路上阻塞的线程同样抛出ClosedByinterruptException) 。当关闭一个InterruptibleChannel 时,将导致所有在链路操作上阻塞的线程都抛出AsynchronousCloseException。大多数标准的Channel 都实现了InterruptibleChannel 。
Selector 的异步 I/O 。如果一个线程在调用 Selector.select 方法(在java.nio.channels 中)时阻塞了,那么调用close 或wakeup 方法会使线程抛出ClosedSelectorException 并提前返回。
获取某个锁。如果一个线程由于等待某个内置锁而阻寒,那么将无法响应中断,因为线程认为它肯定会获得锁,所以将不会理会中断请求。但是,在Lock 类中提供了lockInterruptibly方法,该方法允许在等待一个锁的同时仍能响应中断。
程序ReaderThread 给出了如何封装非标准的取消操作。ReaderThread 管理了一个套接字连接,它采用同步方式从该套接字中读取数据,并将接收到的数据传递给 processBuffer。为了结束某个用户的连接或者关闭服务器, ReaderThread 改写了interrupt 方法,使其既能处理标准的中断,也能关闭底层的套接字。因此,无论ReaderThread 线程是在read方法中阻塞还是在某个可中断的阻塞方法中阻塞,都可以被中断并停止执行当前的工作。
// 通过改写interrupt 方法将非标准的取消操作封装在Thread 中
public class ReaderThread extends Thread {
private static final int BUFSZ = 512;
private final Socket socket;
private final InputStream in;
public ReaderThread(Socket socket) throws IOException {
this.socket = socket;
this.in = socket.getInputStream();
}
public void interrupt() {
try {
socket.close();
} catch (IOException ignored) {
} finally {
super.interrupt();
}
}
public void run() {
try {
byte[] buf = new byte[BUFSZ];
while (true) {
int count = in.read(buf);
if (count < 0)
break;
else if (count > 0)
processBuffer(buf, count);
}
} catch (IOException e) { /* Allow thread to exit */
}
}
public void processBuffer(byte[] buf, int count) {
}
}
7.1.7 采用newTaskFor来封装非标准的取消
我们可以通过newTaskFor 方法来进一步优化ReaderThread 中封装非标准取消的技术,这是Java 6 ThreadPoolExecutor 中的新增功能。
当把一个Callable 提交给ExecutorService 时,submit 方法会返回一个Future, 我们可以通过这个Future 来取消任务。newTaskFor是一个工厂方法,它将创建Future 来代表任务。newTaskFor 还能返回一个RunnableFuture 接口,该接口扩展了Future 和Runnable (并由 FutureTask 实现)。
通过定制表示任务的Future 可以改变Future.cancel 的行为。例如,定制的取消代码可以实现日志记录或者收集取消操作的统计信息,以及取消一些不响应中断的操作。通过改写interrupt 方法, ReaderThread 可以取消基于套接字的线程。同样,通过改写任务的Future.cancel 方法也可以实现类似的功能。
// 通过newTaskFor 将非标准的取消操作封装在一个任务中
public abstract class SocketUsingTask <T> implements CancellableTask<T> {
@GuardedBy("this") private Socket socket;
protected synchronized void setSocket(Socket s) {
socket = s;
}
public synchronized void cancel() {
try {
if (socket != null)
socket.close();
} catch (IOException ignored) {
}
}
public RunnableFuture<T> newTask() {
return new FutureTask<T>(this) {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
try {
SocketUsingTask.this.cancel();
} finally {
return super.cancel(mayInterruptIfRunning);
}
}
};
}
}
interface CancellableTask <T> extends Callable<T> {
void cancel();
RunnableFuture<T> newTask();
}
@ThreadSafe
class CancellingExecutor extends ThreadPoolExecutor {
public CancellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public CancellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public CancellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public CancellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if (callable instanceof CancellableTask)
return ((CancellableTask<T>) callable).newTask();
else
return super.newTaskFor(callable);
}
}
SocketUsingTask 实现了CancellableTask, 并定义了Future.cancel 来关闭套接字和调用super.cancel 。如果SocketUsingTask 通过其自己的Future 来取消,那么底层的套接字将被关闭井且线程将被中断。因此它提高了任务对取消操作的响应性:不仅能够在调用可中断方法的同时确保响应取消操作,而且还能调用可阻调的套接字I/O 方法。
7.2 停止基于线程的服务
应用程序通常会创建拥有多个线程的服务,例如线程池,并且这些服务的生命周期通常比创建它们的方法的生命周期更长。如果应用程序准备退出,那么这些服务所拥有的线程也需要结束。由于无法通过抢占式的方法来停止线程,因此它们需要自行结束。
正确的封装原则是:除非拥有某个线程,否则不能对该线程进行操控。在线程API 中,并没有对线程所有权给出正式的定义:线程由Thread 对象表示,并且像其他对象一样可以被自由共享。然而,线程有一个相应的所有者,即创建该线程的类。因此线程池是其工作者线程的所有者,如果要中断这些线程,那么应该使用线程池。
与其他封装对象一样,线程的所有权是不可传递的:应用程序可以拥有服务,服务也可以拥有工作者线程,但应用程序并不能拥有工作者线程,因此应用程序不能直接停止工作者线程。相反,服务应该提供生命周期方法(Lifecycle Method) 来关闭它自己以及它所拥有的线程。这样,当应用程序关闭该服务时,服务就可以关闭所有的线程了。在ExecutorService 中提供了shutdown 和shutdownNow 等方法。同样,在其他拥有线程的服务中也应该提供类似的关闭机制。
对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法。
7.2.1 示例:日志服务
在大多数服务器应用程序中都会用到日志,在代码中插入println 语句就是一种简单的日志。像PrintWriter 这样的字符流类是线程安全的,因此这种简单的方法不需要显式的同步。另外一种替代方法是通过调用log 方法将日志消息放入某个队列中,并由其他线程来处理。
public class LogService {
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
private final PrintWriter writer;
@GuardedBy("this") private boolean isShutdown;
@GuardedBy("this") private int reservations;
public LogService(Writer writer) {
this.queue = new LinkedBlockingQueue<String>();
this.loggerThread = new LoggerThread();
this.writer = new PrintWriter(writer);
}
public void start() {
loggerThread.start();
}
public void stop() {
synchronized (this) {
isShutdown = true;
}
loggerThread.interrupt();
}
public void log(String msg) throws InterruptedException {
synchronized (this) {
if (isShutdown)
throw new IllegalStateException(/*...*/);
++reservations;
}
queue.put(msg);
}
private class LoggerThread extends Thread {
public void run() {
try {
while (true) {
try {
synchronized (LogService.this) {
if (isShutdown && reservations == 0)
break;
}
String msg = queue.take();
synchronized (LogService.this) {
--reservations;
}
writer.println(msg);
} catch (InterruptedException e) { /* retry */
}
}
} finally {
writer.close();
}
}
}
}
为LogWriter 提供可靠关闭操作的方法是解决竞态条件问题,因而要使日志消息的提交操作成为原子操作。然而,我们不希望在消息加入队列时去持有一个锁,因为put 方法本身就可以阻塞。我们采用的方法是:通过原子方式来检查关闭请求,并且有条件地递增一个计数器来保持“提交消息的权利。
7.2.2 关闭ExecutorService
ExecutorService 提供了两种关闭方法:使用shutdown 正常关闭,以及使用shutdownNow 强行关闭。在进行强行关闭时, shutdownNow 首先关闭当前正在执行的任务,然后返回所有尚未启动的任务清单。
这两种关闭方式的差别在于各自的安全性和响应性:强行关闭的速度更快,但风险也更大,因为任务很可能在执行到一半时被结束;而正常关闭虽然速度慢,但却更安全,因为ExecutorService 会一直等到队列中的所有任务都执行完成后才关闭。在其他拥有线程的服务中也应该考虑提供类似的关闭方式以供选择。
简单的程序可以直接在main 函数中启动和关闭全局的ExecutorService 。而在复杂程序中,通常会将ExecutorService 封装在某个更高级别的服务中,并且该服务能提供其自己的生命周期方法。
public class LogService {
private final ExecutorService exec = newSingleThreadPool();
...
public void start() {}
public void stop() throws InterruptedException {
try{
exec.shutdown();
exec.awaitTermination(TIMEOUT, UNIT);
}finally{
write.close();
}
}
public void log(String msg) {
try{
exec.execute(new WriteTask(msg));
} catch (InterruptedException ignored){ }
}
}
7.2.3 “毒丸” 对象
另一种关闭生产者-消费者服务的方式就是使用“毒丸( Poison Pill )" 对象:“毒丸”一个放在队列上的对象,其含义是:“当得到这个对象时,立即停止。”在FIFO (先进先出)队列中,“毒丸”对象将确保消费者在关闭之前首先完成队列中的所有工作,在提交“毒丸”对象之前提交的所有工作都会被处理,而生产者在提交了“毒丸”对象后,将不会再提交任何工作。
// 一个单生产者-单消费者的桌面搜索示例,使用了“毒丸”对象来关闭服务。
public class IndexingService {
private static final int CAPACITY = 1000;
private static final File POISON = new File("");
private final IndexerThread consumer = new IndexerThread();
private final CrawlerThread producer = new CrawlerThread();
private final BlockingQueue<File> queue;
private final FileFilter fileFilter;
private final File root;
public IndexingService(File root, final FileFilter fileFilter) {
this.root = root;
this.queue = new LinkedBlockingQueue<File>(CAPACITY);
this.fileFilter = new FileFilter() {
public boolean accept(File f) {
return f.isDirectory() || fileFilter.accept(f);
}
};
}
private boolean alreadyIndexed(File f) {
return false;
}
// 通过“毒丸”对象来关闭服务
public void start() {
producer.start();
consumer.start();
}
public void stop() {
producer.interrupt();
}
public void awaitTermination() throws InterruptedException {
consumer.join();
}
// lndexingSeNice 的生产者线程
class CrawlerThread extends Thread {
public void run() {
try {
crawl(root);
} catch (InterruptedException e) { /* fall through */
} finally {
while (true) {
try {
queue.put(POISON);
break;
} catch (InterruptedException e1) { /* retry */
}
}
}
}
private void crawl(File root) throws InterruptedException {
File[] entries = root.listFiles(fileFilter);
if (entries != null) {
for (File entry : entries) {
if (entry.isDirectory())
crawl(entry);
else if (!alreadyIndexed(entry))
queue.put(entry);
}
}
}
}
// lndexingService 的消费者线程
class IndexerThread extends Thread {
public void run() {
try {
while (true) {
File file = queue.take();
if (file == POISON)
break;
else
indexFile(file);
}
} catch (InterruptedException consumed) {
}
}
public void indexFile(File file) {
/*...*/
};
}
}
只有在生产者和消费者的数量都已知的情况下,才可以使用“毒丸”对象。在IndexingService 中采用的解决方案可以扩展到多个生产者:只需每个生产者都向队列中放入一个“毒丸”对象,并且消费者仅当在接收到 Nproducers个“毒丸”对象时才停止。这种方法也可以扩展到多个消费者的情况,只需生产者将 Nconsumers 个“毒丸”对象放入队列。然而,当生产者和消费者的数最较大时,这种方法将变得难以使用。只有在无界队列中,“毒丸”对象才能可靠地工作。
7.2.4 示例:只执行一次的服务
如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,那么可以通过一个私有的Executor 来简化服务的生命周期管理,其中该Executor 的生命周期是由这个方法来控制的。(在这种情况下, invokeAll 和invokeAny 等方法通常会起较大的作用。)
程序 checkMail 方法能在多台主机上并行地检查新邮件。它创建一个私有的Executor, 并向每台主机提交一个任务。然后,当所有邮件检查任务都执行完成后,关闭Executor 并等待结束。
public class CheckForMail {
public boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit)
throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final AtomicBoolean hasNewMail = new AtomicBoolean(false);
try {
for (final String host : hosts)
exec.execute(new Runnable() {
public void run() {
if (checkMail(host))
hasNewMail.set(true);
}
});
} finally {
exec.shutdown();
exec.awaitTermination(timeout, unit);
}
return hasNewMail.get();
}
private boolean checkMail(String host) {
// Check for mail
return false;
}
}
7.2.5 shutdownNow 的局限性
当通过shutdownNow 来强行关闭ExecutorService 时,它会尝试取消正在执行的任务,井返回所有已提交但尚未开始的任务,从而将这些任务写入日志或者保存起来以便之后进行处理。
然而,我们无法通过常规方法来找出哪些任务已经开始但尚未结束。这意味着我们无法在关闭过程中知道正在执行的任务的状态,除非任务本身会执行某种检查。要知道哪些任务还没有完成,你不仅需要知道哪些任务还没有开始,而且还需要知道当Executor 关闭时哪些任务正在执行。
TrackingExecutor 中给出了如何在关闭过程中判断正在执行的任务。通过封装ExecutorService 井使得execute (类似地还有submit, 在这里没有给出)记录哪些任务是在关闭后取消的, TrackingExecutor 可以找出哪些任务已经开始但还没有正常
完成。在Executor 结束后, getCancelledTasks 返回被取消的任务清单。要使这项技术能发挥作用,任务在返回时必须维持线程的中断状态,在所有设计良好的任务中都会实现这个功能。
public class TrackingExecutor extends AbstractExecutorService {
private final ExecutorService exec;
private final Set<Runnable> tasksCancelledAtShutdown =
Collections.synchronizedSet(new HashSet<Runnable>());
public TrackingExecutor(ExecutorService exec) {
this.exec = exec;
}
public void shutdown() {
exec.shutdown();
}
public List<Runnable> shutdownNow() {
return exec.shutdownNow();
}
public boolean isShutdown() {
return exec.isShutdown();
}
public boolean isTerminated() {
return exec.isTerminated();
}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return exec.awaitTermination(timeout, unit);
}
public List<Runnable> getCancelledTasks() {
if (!exec.isTerminated())
throw new IllegalStateException(/*...*/);
return new ArrayList<Runnable>(tasksCancelledAtShutdown);
}
public void execute(final Runnable runnable) {
exec.execute(new Runnable() {
public void run() {
try {
runnable.run();
} finally {
if (isShutdown()
&& Thread.currentThread().isInterrupted())
tasksCancelledAtShutdown.add(runnable);
}
}
});
}
}
在TrackingExecutor 中存在一个不可避免的竞态条件,从而产生“误报”问题:一些被认为已取消的任务实际上已经执行完成。这个问题的原因在于,在任务执行最后一条指令以及线程池将任务记录为 “结束“ 的两个时刻之间,线程池可能被关闭。如果任务是幕等的(Idempotent, 即将任务执行两次与执行一次会得到相同的结果),那么这不会存在问题,在网页爬虫程序中就是这种情况。否则,在应用程序中必须考虑这种风险,并对“误报”问题做好准备。
7.3 处理非正常的线程终止
当单线程的控制台程序由于发生了一个未捕获的异常而终止时,程序将停止运行,并产生与程序正常输出非常不同的栈追踪信息,这种情况是很容易理解的。然而,如果并发程序中的某个线程发生故障,那么通常井不会如此明显。在控制台中可能会输出栈追踪信息,但没有人会观察控制台。此外,当线程发生故障时,应用程序可能看起来仍然在工作,所以这个失败很可能会被忽略。幸运的是,我们有可以监测并防止在程序中“遗漏“线程的方法。
导致线程提前死亡的最主要原因就是RuntimeException。由于这些异常表示出现了某种编程错误或者其他不可修复的错误,因此它们通常不会被捕获。它们不会在调用栈中逐层传递,而是默认地在控制台中输出栈追踪信息,并终止线程。
线程非正常退出的后果可能是良性的,也可能是恶性的,这要取决于线程在应用程序中的作用。虽然在线程池中丢失一个线程可能会对性能带来一定影响,但如果程序能在包含50 个线程的线程池上运行良好,那么在包含49个线程的线程池上通常也能运行良好。
任何代码都可能抛出一个RuntimeException 。每当调用另一个方法时,都要对它的行为保持怀疑,不要盲目地认为它一定会正常返回,或者一定会抛出在方法原型中声明的某个已检查异常。对调用的代码越不熟悉,就越应该对其代码行为保持怀疑。
在任务处理线程(例如线程池中的工作者线程或者Swing 的事件派发线程等)的生命周期中,将通过某种抽象机制(例如Runnable) 来调用许多未知的代码,我们应该对在这些线程中执行的代码能否表现出正确的行为保持怀疑。
因此,这些线程应该在try-catch 代码块中调用这些任务,这样就能捕获那些未检查的异常了,或者也可以使用try-finally 代码块来确保框架能够知道线程非正常退出的情况,并做出正确的响应。在这种情况下,你或许会考虑捕获RuntimeException, 即当通过Runnable 这样的抽象机制来调用未知的和不可信的代码时。
在程序中给出了如何在线程池内部构建一个工作者线程。如果任务抛出了一个未检查异常,那么它将使线程终结,但会首先通知框架该线程已经终结。然后,框架可能会用新的线程来代替这个工作线程,也可能不会,因为线程池正在关闭,或者当前已有足够多的线程能满足需要。ThreadPoolExecutor 和Swing 都通过这项技术来确保行为糟糕的任务不会影响到后续任务的执行。
public void run(){
Throwable thrown = null;
try{
while(!isInterrupted()){
runTask(getTaskFromWorkQueue());
}
}catch (Throwable e){
thrown = e;
}finally{
threadExited(this, thrown);
}
}
未捕获异常的处理
在Thread API 中同样提供了UncaughtExceptionHandler,它能检测出某个线程由于未捕获的异常而终结的情况。这两种方法是互补的,通过将二者结合在一起,就能有效地防止线程泄漏问题。
当一个线程由于未捕获异常而退出时, JVM 会把这个事件报告给应用程序提供的UncaughtExceptionHandler 异常处理器。如果没有提供任何异常处理器,那么默认的行为是将栈追踪信息输出到System.err。
异常处理器如何处理未捕获异常,取决于对服务质匮的需求。最常见的响应方式是将一个错误信息以及相应的栈追踪信息写入应用程序日志中。异常处理器还可以采取更直接的响应,例如尝试重新启动线程,关闭应用程序,或者执行其他修复或诊断等操作。
在运行时间较长的应用程序中,通常会为所有线程的未捕荻异常指定同一个异常处理器,并且该处理器至少会将异常信息记录到日志中。
要为线程池中的所有线程设置一个UncaughtExceptionHandler, 需要为ThreadPoolExecutor的构造函数提供一个ThreadFactory 。(与所有的线程操控一样,只有线程的所有者能够改变线程的UncaughtExceptionHandler 。)标准线程池允许当发生未捕获异常时结束线程,但由于使用了一个try-finally 代码块来接收通知,因此当线程结束时,将有新的线程来代替它。如果没有提供捕获异常处理器或者其他的故障通知机制,那么任务会悄悄失败,从而导致极大的混乱。如果你希望在任务由于发生异常而失败时获得通知,并且执行一些特定于任务的恢复操作,那么可以将任务封装在能捕获异常的Runnable 或Callable 中,或者改ThreadPoolExecutor 的afterExecute 方法。
令人困惑的是,只有通过execute 提交的任务,才能将它抛出的异常交给未捕获异常处理器,而通过submit 提交的任务,无论是抛出的未检查异常还是已检查异常,都将被认为是任务返回状态的一部分。如果一个由submit 提交的任务由于抛出了异常而结束,那么这个异常将被Future.get 封装在ExecutionException 中重新抛出。
7.4 JVM 关闭
JVM 既可以正常关闭,也可以强行关闭。正常关闭的触发方式有多种,包括:当个“正常(非守护)“线程结束时,或者当调用了System.exit 时,或者通过其他特定于平台的方法关闭时(例如发送了SIGINT 信号或键入Ctrl-C) 。虽然可以通过这些标准方法来正常闭JVM, 但也可以通过调用Runtime. halt 或者在操作系统中“杀死”JVM 进程(例如发送SIGKILL) 来强行关闭JVM 。
7.4.1 关闭钩子
在正常关闭中, JVM 首先调用所有已注册的关闭钩子(Shutdown Hook) 。关闭钩子是指通过Runtime.addShutdownHook 注册的但尚未开始的线程。JVM 并不能保证关闭钩子的调用顺序。在关闭应用程序线程时,如果有(守护或非守护)线程仍然在运行,那么这些线程接下来将与关闭进程并发执行。当所有的关闭钓子都执行结束时,如果runFinalizersOnExit 为true 那么JVM 将运行终结器,然后再停止。JVM 并不会停止或中断任何在关闭时仍然运行的应用程序线程。当JVM 最终结束时,这些线程将被强行结束。如果关闭钩子或终结器没有执行完成,那么正常关闭进程”挂起”并且JVM 必须被强行关闭。当被强行关闭时,只是关闭JVM, 而不会运行关闭钩子。
关闭钩子应该是线程安全的:它们在访问共享数据时必须使用同步机制,并且小心地避免发生死锁,这与其他并发代码的要求相同。而且,关闭钩子不应该对应用程序的状态(例如,其他服务是否已经关闭,或者所有的正常线程是否已经执行完成)或者JVM 的关闭原因做出任何假设,因此在编写关闭钩子的代码时必须考虑周全。最后,关闭钩子必须尽快退出,因为它们会延迟 JVM 的结束时间,而用户可能希望 JVM 能尽快终止。
关闭钩子可以用于实现服务或应用程序的清理工作,例如删除临时文件,或者清除无法由操作系统自动清除的资源。
由于关闭钓子将并发执行,因此在关闭日志文件时可能导致其他需要日志服务的关闭钩子产生问题。为了避免这种情况,关闭钩子不应该依赖那些可能被应用程序或其他关闭钩子关闭的服务。
实现这种功能的一种方式是对所有服务使用同一个关闭钩子(而不是每个服务使用一
个不同的关闭钩子),并且在该关闭钩子中执行一系列的关闭操作。这确保了关闭操作在单个线程中串行执行,从而避免了在关闭操作之间出现竞态条件或死锁等问题。无论是否使用关闭钓子,都可以使用这项技术,通过将各个关闭操作串行执行而不是并行执行,可以消除许多潜在的故障。当应用程序需要维护多个服务之间的显式依赖信息时,这项技术可以确保关闭操作按照正确的顺序执行。
// 通过注册一个关闭钩子来停止日志服务
public void start(){
Runtime.getRuntime().addShudownHook(() -> {
try{
LogService.this.stop();
}catch(InterruptedException ignored) {}
});
}
7.4.2 守护线程
有时候,你希望创建一个线程来执行一些辅助工作,但又不希望这个线程阻碍JVM 的关闭。在这种情况下就需要使用守护线程(Daemon Thread) 。
线程可分为两种:普通线程和守护线程。在JVM 启动时创建的所有线程中,除了主线程以外,其他的线程都是守护线程(例如垃圾回收器以及其他执行辅助工作的线程)。当创个新线程时,新线程将继承创建它的线程的守护状态,因此在默认情况下,主线程创建的所有线程都是普通线程。
普通线程与守护线程之间的差异仅在于当线程退出时发生的操作。当一个线程退出时,JVM 会检查其他正在运行的线程,如果这些线程都是守护线程,那么JVM 会正常退出操作。当JVM 停止时,所有仍然存在的守护线程都将被抛弃——既不会执行finally 代码块,也不会执行回卷栈,而JVM 只是直接退出。
我们应尽可能少地使用守护线程——很少有操作能够在不进行清理的情况下被安全地抛弃。特别是,如果在守护线程中执行可能包含 I/O 操作的任务,那么将是一种危险的行为。护线程最好用于执行“内部“任务,例如周期性地从内存的缓存中移除逾期的数据。
守护进程不能用来代替应用程序管理程序中各个服务的生命周期
7.4.3 终结器
当不再需要内存资源时,可以通过垃圾回收器来回收它们,但对于其他一些资源,例如文件句柄或套接字句柄,当不再需要它们时,必须显式地交还给操作系统。为了实现这个功能,垃圾回收器对那些定义了finalize 方法的对象会进行特殊处理:在回收器释放它们后,调用它们的finalize 方法,从而保证一些持久化的资源被释放。
由于终结器可以在某个由JVM 管理的线程中运行,因此终结器访问的任何状态都可能被多个线程访问,这样就必须对其访问操作进行伺步。终结器井不能保证它们将在何时运行甚至是否会运行,并且复杂的终结器通常还会在对象上产生巨大的性能开销。要编写正确的终结器是非常困难的 。在大多数情况下,通过使用finally 代码块和显式的close 方法,能够比使用终结器更好地管理资源。唯一的例外情况在于:当需要管理对象,井且该对象持有的资源是通过本地方法获得的。基于这些原因以及其他一些原因,我们要尽量避免编写或使用包含终结器的类(除非是平台库中的类)
避免使用终结器
小结
在任务、线程、服务以及应用程序等模块中的生命周期结束问题,可能会增加它们在设计和实现时的复杂性。Java 并没有提供某种抢占式的机制来取消操作或者终结线程。相反,它提供了一种协作式的中断机制来实现取消操作,但这要依赖于如何构建取消操作的协议,以及能否始终遵循这些协议。通过使用FutureTask 和Executor 框架,可以帮助我们构建可取消的任务和服务。
第八章 线程池的使用
8.1 在任务与执行策略之间的隐性耦合
Executor 框架可以将任务的提交与任务的执行策略解耦开来。就像许多对复杂过程的解耦操作那样。虽然Executor 框架为制定和修改执行策略都提供了相当大的灵活性,但并非所有的任务都能适用所有的执行策略。有些类型的任务需要明确地指定执行策略,包括:
依赖性任务。大多数行为正确的任务都是独立的:它们不依赖于其他任务的执行时序、执行结果或其他效果。当在线程池中执行独立的任务时,可以随意地改变线程池的大小和配置,这些修改只会对执行性能产生影响。然而,如果提交给线程池的任务需要依赖其他的任务,那么就隐含地给执行策略带来了约束,此时必须小心地维持次些执行策略以避免产生活跃性问题
使用线程封闭机制的任务。与线程池相比,单线程的Executor 能够对并发性做出更强的承诺。它们能确保任务不会并发地执行,使你能够放宽代码对线程安全的要求。对象可以封闭在任务线程中,使得在该线程中执行的任务在访问该对象时不需要同步,即使这些资源不是线程安全的也没有问题。这种情形将在任务与执行策略之间形成隐式的耦合—任务要求其执行所在的Executor 是单线程的包如果将Executor 从单线程环境改为线程池环境,那么将会失去线程安全性。
对响应时间敏感的任务。GUI 应用程序对于响应时间是敏感的:如果用户在点击按钮后需要很长延迟才能得到可见的反馈,那么他们会感到不满。如果将一个运行时间较长的任务提交到单线程的Executor 中,或者将多个运行时间较长的任务提交到一个只包含少量线程的线程池中,那么将降低由该Executor 管理的服务的响应性。
使用Thread Local 的任务。使每个线程都可以拥有某个变量的一个私有"版本”。然而,只要条件允许, Executor 可以自由地重用这些线程。在标准的Executor 实现中,当执行需求较低时将回收空闲线程,而当需求增加时将添加新的线程,并且如果从任务中抛出了一个未检查异常,那么将用一个新的工作者线程来替代抛出异常的线程。只有当线程本地值的生命周期受限于任务的生命周期时,在线程池的线程中使用 ThreadLocal 才有意义,而在线程池的线程中不应该使用 ThreadLocal 在任务之间传递值。
只有当任务都是同类型的并且相互独立时,线程池的性能才能达到最佳。如果将运行时间较长的与运行时间较短的任务混合在一起,那么除非线程池很大,否则将可能造成“拥塞”。如果提交的任务依赖于其他任务,那么除非线程池无限大,否则将可能造成死锁。幸运的是,在基于网络的典型股务器应用程序中一一网页服务器、邮件服务器以及文件服务器等,它们的请求通常都是同类型的并且相互独立的。
在一些任务中,需要拥有或排除某种特定的执行策略。如果某些任务依赖于其他的任务,那么会要求线程池足够大,从而确保它们依赖任务不会被放入等待队列中或被拒绝,而采用线程封闭机制的任务需要串行执行。通过将这些需求写入文档,将来的代码维护人员就不会由于使用了某种不合适的执行策略而破坏安全性或活跃性。
8.1.1 线程饥饿死锁
在线程池中,如果任务依赖于其他任务,那么可能产生死锁。在单线程的Executor 中,如果一个任务将另一个任务提交到同一个Executor, 并且等待这个被提交任务的结果,那么通常会引发死锁。第二个任务停留在工作队列中,并等待第一个任务完成,而第一个任务又无法完成,因为它在等待第二个任务的完成。在更大的线程池中,如果所有正在执行任务的线程都由于等待其他仍处于工作队列中的任务而阻塞,那么会发生同样的问题。
这种现象被称为线程饥饿死锁(Thread Starvation Deadlock) ,只要线程池中的任务需要无限期地等待一些必须由池中其他任务才能提供的资源或条件,例如某个任务等待另一个任务的返回值或执行结果,那么除非线程池足够大,否则将发生线程饥饿死锁。
每当提交了一个有依赖性的 Exeeutor 任务时,要清楚地知道可能会出现线程“饥饿”死锁,因此需要在代码或配置 Executor 的配置文件中记录线程池的大小限制或配置根制。
除了在线程池大小上的显式限制外,还可能由于其他资源上的约束而存在一些隐式限制。如果应用程序使用一个包含10 个连接的JDBC 连接池,并且每个任务需要一个数据库连接,那么线程池就好像只有10 个线程,因为当超过10 个任务时,新的任务需要等待其他任务释放连接。
8.1.2 运行时间较长的任务
如果任务阻塞的时间过长,那么即使不出现死锁,线程池的响应性也会变得糟糕。执行时间较长的任务不仅会造成线程池堵塞,甚至还会增加执行时间较短任务的服务时间。如果线程池中线程的数量远小于在稳定状态下执行时间较长任务的数量,那么到最后可能所有的线程都会运行这些执行时间较长的任务,从而影响整体的响应性。
有一项技术可以缓解执行时间较长任务造成的影响,即限定任务等待资源的时间,而不要无限制地等待。在平台类库的大多数可阻塞方法中,都同时定义了限时版本和无限时版本,例如Thread.join 、BlockingQueue.put 、CountDownLatch.await 以及Selector.select 等。
如果等待超时,那么可以把任务标识为失败,然后中止任务或者将任务重新放回队列以便随后执行。这样,无论任务的最终结果是否成功,这种办法都能确保任务总能继续执行下去,并将线程释放出来以执行一些能更快完成的任务。如果在线程池中总是充满了被阻塞的任务,那么也可能表明线程池的规模过小。
8.2 设置线程池的大小
线程池的理想大小取决于被提交任务的类型以及所部署系统的特性。在代码中通常不会固定线程池的大小,而应该通过某种配置机制来提供,或者根据 Runtime.availableProcessors 来动态计算。
幸运的是,要设置线程池的大小也并不困难,只需要避免“过大”和“过小”这两种极端情况。如果线程池过大,那么大量的线程将在相对很少的CPU 和内存资源上发生竞争,这不仅会导致更高的内存使用量,而且还可能耗尽资源。如果线程池过小,那么将导致许多空闲的处理器无法执行工作,从而降低吞吐率。
当然, CPU 周期井不是唯一影响线程池大小的资源,还包括内存、文件句柄、套接字句柄和数据库连接等。计算这些资源对线程池的约束条件是更容易的:计算每个任务对该资源的需求最,然后用该资源的可用总量除以每个任务的需求量,所得结果就是线程池大小的上限。
当任务需要某种通过资源池来管理的资源时,例如数据库连接,那么线程池和资源池的大小将会相互影响。如果每个任务都需要一个数据库连接,那么连接池的大小就限制了线程池的大小。同样,当线程池中的任务是数据库连接的唯一使用者时,那么线程池的大小又将限制连接池的大小。
8.3 配置ThreadPool Executor
ThreadPoolExecutor 为一些Executor 提供了基本的实现,这些Executor 是由Executors 中的newCachedThreadPool 、newFixedThreadPool 和newScheduledThreadExecutor 等工厂方法返回的。ThreadPoolExecutor 是一个灵活的、稳定的线程池,允许进行各种定制。如果默认的执行策略不能满足需求,那么可以通过ThreadPoolExecutor 的构造函数来实例化一个对象,并根据自己的需求来定制,并且可以参考Executors 的源代码来了解默认配置下的执行策略,然后再以这些执行策略为基础进行修改。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
8.3.1 线程的创建与销毁
线程池的基本大小(Core Pool Size) 、最大大小(Maximum Pool Size) 以及存活时间等因素共同负责线程的创建与销毁。基本大小也就是线程池的目标大小,即在没有任务执行时e线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数晕的线程。线程池的最大大小表示可同时活动的线程数最的上限。如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止。
通过调节线程池的基本大小和存活时间,可以帮助线程池回收空闲线程占有的资源,从而使得这些资源可以用于执行其他工作。(显然,这是一种折衷:回收空闲线程会产生额外的延迟,因为当需求增加时,必须创建新的线程来满足需求。)
newFixedThreadPool 工厂方法将线程池的基本大小和最大大小设置为参数中指定的值,而且创建的线程池不会超时。newCachedThreadPool 工厂方法将线程池的最大大小设置为Integer.MAX_VALUE, 而将基本大小设置为零,并将超时设置为1 分钟,这种方法创建出来的线程池可以被无限扩展,并且当需求降低时会自动收缩。其他形式的线程池可以通过显式的ThreadPoolExecutor 构造函数来构造。
8.3.2 管理队列任务
在有限的线程池中会限制可并发执行的任务数量。(单线程的Executor 是一种值得注意的特例:它们能确保不会有任务并发执行,因为它们通过线程封闭来实现线程安全性。)
如果无限制地创建线程,那么将导致不稳定性,并通过采用固定大小的线程池(而不是每收到一个请求就创建一个新线程)来解决这个问题。然而,这个方案并不完整。在高负载情况下,应用程序仍可能耗尽资源,只是出现问题的概率较小。即使请求的平均到达速率很稳定,也仍然会出现请求突增的情况。
ThreadPoolExecutor 允许提供一个BlockingQueue 来保存等待执行的任务。基本的任务排队方法有3 种:无界队列、有界队列和同步移交(Synchronous Handof汃队列的选择与其他的配置参数有关,例如线程池的大小等。
newFixedThreadPool 和newSingleThreadExecutor 在默认情况下将使用一个无界的LinkedBlockingQueue 。如果任务持续快速地到达,并且超过了线程池处理它们的速度,那么队列将无限制地增加。一种更稳妥的资源管理策略是使用有界队列,例如ArrayB lockingQueue 、有界的LinkedBlockingQueue 、PriorityBlockingQueue。
对于非常大的或者无界的线程池,可以通过使用 SynchronousQueue 来避免任务排队,以及直接将任务从生产者移交给工作者线程。SynchronousQueue 不是一个真正的队列,而是一种在线程之间进行移交的机制。
- 要将一个元素放入 SynchronousQueue 中,必须有另一个线程正在等待接受这个元素。
- 如果没有线程正在等待,并且线程池的当前大小小于最大值,那么ThreadPoolExecutor 将创建一个新的线程
- 否则根据饱和策略,这个任务将被拒绝。
使用直接移交将更高效,因为任务会直接移交给执行它的线程,而不是被首先放在队列中,然后由工作者线程从队列中提取该任务。只有当线程池是无界的或者可以拒绝任务时, SynchronousQueue才有实际价值。在newCachedThreadPool 工厂方法中就使用了SynchronousQueue 。
当使用像LinkedBlockingQueue 或ArrayBlockingQueue 这样的FIFO(先进先出)队列时,任务的执行顺序与它们的到达顺序相同。如果想进一步控制任务执行顺序,还可以使用PriorityBlockingQueue, 这个队列将根据优先级来安排任务。任务的优先级是通过自然顺序或Comparator (如果任务实现了Comparable) 来定义的。
对于Executor,newCachedThreadPool 工厂方法是一种很好的默认选择,它能提供比固定大小的线程池更好地排队特性。当需要限制当前任务的任务的数量以满足资源管需求时,那么可以选择固定大小的线程池,就像在接收网络客户请求的服务器应用程序中,如果不进行限制,那么很容易发生过载的情况。
只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。如果任务之间存在依赖性,那么有界的线程池或队列就可能导致线程”饥饿“死锁问题。此时应该使用无界的线程池,例如newCachedThreadPool。
8.3.3 饱和策略
当有界队列被填满后,饱和策略开始发挥作用。ThreadPoolExecutor 的饱和策略可以通过调用setRejectedExecutionHandler 来修改。(如果某个任务被提交到一个巳被关闭的Executor时,也会用到饱和策略。)JDK 提供了几种不同的 RejectedExecutionHandler 实现,每种实现都包含有不固的饱和策略: AbortPolicy 、CallerRunsPolicy 、DiscardPolicy 和DiscardOldestPolicy 。
“中止(Abort) “策略是默认的饱和策略,该策略将抛出未检查的RejectedExecutionException。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。当新提交的任务法保存到队列中等待执行时,“抛弃(Discard)” 策略会悄悄抛弃该任务。“抛弃最旧的(Discard-Oldest)" 策略则会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。(如果工作队列是一个优先队列,那么“抛弃最旧的“策略将导致抛弃优先级最高的任务,因此最好不要将“抛弃最旧的"饱和策略和优先级队列放在一起使用。)
“调用者运行(Caller-Runs)" 策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute 的线程中执行该任务。
// 创建一个固定大小的线程池,井采用有界队列以及“调用者运行“饱和策略
ThreadPoolExecutor executor =
new ThreadPoolExecutor(NTHREADS, NTHREADS, 0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(CAPACITY));
// 当创建Executor 时,可以选择饱和策略或者对执行策略进行修改。
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
当工作队列被填满后,没有预定义的饱和策略来阻塞execute 。然而,通过使用Semaphore(信号量)来限制任务的到达率,就可以实现这个功能。
在程序BoundedExecutor 中给出了这种方法。该方法使用了一个无界队列(因为不能限制队列的大小和任务的到达率),并设置信号量的上界设置为线程池的大小加上可排队任务的数量,这是因为信号量需要控制正在执行的和等待执行的任务数量。
@ThreadSafe
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command)
throws InterruptedException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
}
}
}
8.3.4 线程工厂
每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。默认的线程工厂方法将创建一个新的、非守护的线程,并且不包含特殊的配置信息。通过指定一个线程工厂方法,可以定制线程池的配置信息。在ThreadFactory 中只定义了一个方法newThread, 每当线程池需要创建一个新线程时都会调用这个方法。
在许多情况下都需要使用定制的线程工厂方法。例如,你希望为线程池中的线程指定一个UncaughtExceptionHandler, 或者实例化一个定制的Thread 类用于执行调试信息的记录。你还可能希望修改线程的优先级或者守护状态。或许你只是希望给线程取一个更有意义的名称,用来解释线程的转储信息和错误日志。
程序MyThreadF actory 中给出了一个自定义的线程工厂。它创建了一个新的MyAppThread 实例,并将一个特定于线程池的名字传递给MyAppThread 的构造函数,从而可以在线程转储和错误日志信息中区分来自不同线程池的线程.在应用程序的其他地方也可以使用MyAppThread, 以便所有线程都能使用它的调试功能。
public class MyThreadFactory implements ThreadFactory {
private final String poolName;
public MyThreadFactory(String poolName) {
this.poolName = poolName;
}
public Thread newThread(Runnable runnable) {
return new MyAppThread(runnable, poolName);
}
// 定制其他行为
}
如果在应用程序中需要利用安全策略来控制对某些特殊代码库的访问权限,那么可以通过Executor 中的privilegedThreadFactory 工厂来定制自己的线程工厂。通过这种方式创建出来的线程,将与创建privilegedThreadFactory 的线程拥有相同的访问权限、AccessControlContext 和contextClassLoader。如果不使用privilegedThreadFactory, 线程池创建的线程将从在需要新线程时调用execute 或submit 的客户程序中继承访问权限,从而导致令人困惑的安全性异常。
8.3.5 在调用构造函数后再定制ThreadPoolExecutor
在调用完ThreadPoolExecutor 的构造函数后,仍然可以通过设置函数(Setter) 来修改大多数传递给它的构造函数的参数(例如线程池的基本大小、最大大小、存活时间、线程工以及拒绝执行处理器(Rejected Execution Handler) )。如果Executor 是通过Executors 中的某个(newSingleTbreadExecutor 除外)工厂方法创建的,那么可以将结果的类型转换为 ThreadPoolExecutor 以访问设置器。
在Executors 中包含一个unconfigurableExecutorService 工厂方法,该方法对一个现有的ExecutorService 进行包装,使其只暴露出ExecutorService 的方法,因此不能对它进行配置。newSingleThreadExecutor 返回按这种方式封装的ExecutorService, 而不是最初的ThreadPoolExecutor 。虽然单线程的Executor 实际上被实现为一个只包含唯一线程的线程池,但它同样确保了不会并发地执行任务。如果在代码中增加单线程Executor 的线程池大小,那么将破坏它的执行语义。
你可以在自己的Executor 中使用这项技术以防止执行策略被修改。如果将ExecutorService暴露给不信任的代码,又不希望对其进行修改,就可以通过unconfigurableExecutorService 来包装它。·
8.4 扩展Thread Pool Executor
ThreadPoolExecutor 是可扩展的,它提供了几个可以在子类化中改写的方法: beforeExecute 、after Execute 和terminated, 这些方法可以用于扩展ThreadPoolExecutor 的行为。
在执行任务的线程中将调用beforeExecute 和afterExecute 等方法,在这些方法中还可以添加日志、计时、监视或统计信息收集的功能。无论任务是从run 中正常返回,还是抛出一个异常而返回, afterExecute 都会被调用。(如果任务在完成后带有一个Error, 那么就不会调用after Execute 。) 如果beforeExecute 抛出一个RuntimeException, 那么任务将不被执行,并且afterExecute 也不会被调用。
在线程池完成关闭操作时调用terminated, 也就是在所有任务都已经完成并且所有工作者线程也巳经关闭后。terminated 可以用来释放Executor 在其生命周期里分配的各种资源,此外还可以执行发送通知、记录日志或者收集finalize 统计信息等操作。
示例:给线程池添加统计信息
程序的TimingThreadPool 中给出了一个自定义的线程池,它通过before Execute 、afterExecute 和terminated 等方法来添加日志记录和统计信息收集。为了测量任务的运行时间, beforeExecute 必须记录开始时间井把它保存到一个afterExecute 可以访问的地方。因为这些方法将在执行任务的线程中调用,因此beforeExecute 可以把值保存到一个ThreadLocal 变量中,然后由afterExecute 来读取。在TimingThreadPool 中使用了两个AtomicLong 变扯,分别用于记录已处理的任务数和总的处理时间,井通过terminated 来输出包含平均任务时间的日志消息。
public class TimingThreadPool extends ThreadPoolExecutor {
public TimingThreadPool() {
super(1, 1, 0L, TimeUnit.SECONDS, null);
}
private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
private final Logger log = Logger.getLogger("TimingThreadPool");
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
log.fine(String.format("Thread %s: start %s", t, r));
startTime.set(System.nanoTime());
}
protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
log.fine(String.format("Thread %s: end %s, time=%dns",
t, r, taskTime));
} finally {
super.afterExecute(r, t);
}
}
protected void terminated() {
try {
log.info(String.format("Terminated: avg time=%dns",
totalTime.get() / numTasks.get()));
} finally {
super.terminated();
}
}
}
8.5 递归算法的井行化
当串行循环中的各个迭代操作之间彼此独立,并且每个迭代操作执行的工作篮比管理一个新任务时带来的开销更多,那么这个串行循环就适合并行化。
小结
对于井发执行的任务, Executor 框架是一种强大且灵活的框架。它提供了大量可调节的选项,例如创建线程和关闭线程的策略,处理队列任务的策略,处理过多任务的策略,井且提供了几个钓子方法来扩展它的行为。然而,与大多数功能强大的框架一样,其中有些设置参数并不能很好地工作,某些类型的任务需要特定的执行策略,而一些参数组合则可能产生奇怪的结果。
评论区