- Published on
Java并发-ExecutorService使用
- Authors
- Name
- Wenzhuo Zhao
在 Java 5 之后,并发编程引入了一堆新的启动、调度和管理线程的API。Executor
框架便是 Java 5 中引入的,其内部使用了线程池机制,它在 java.util.cocurrent
包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。因此,在 Java 5之后,通过 Executor
来启动线程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this
逃逸问题。
补充:
this
逃逸是指在构造函数返回之前其他线程就持有该对象的引用. 调用尚未构造完全的对象的方法可能引发令人疑惑的错误。
Executor框架
任务(
Runnable
/Callable
) 执行任务需要实现的Runnable
接口 或Callable
接口。Runnable
接口或Callable
接口 实现类都可以被ThreadPoolExecutor
或ScheduledThreadPoolExecutor
执行。任务的执行(
Executor
) 如下图所示,包括任务执行机制的核心接口Executor
,以及继承自Executor
接口的ExecutorService
接口。ThreadPoolExecutor
和ScheduledThreadPoolExecutor
这两个关键类实现了ExecutorService
接口。
- 异步计算的结果(
Future
)Future
接口以及Future
接口的实现类FutureTask
类都可以代表异步计算的结果。 当我们把Runnable
接口 或Callable
接口 的实现类提交给ThreadPoolExecutor
或ScheduledThreadPoolExecutor
执行。(调用submit()
方法时会返回一个FutureTask
对象)
总结:
- 主线程首先要创建实现
Runnable
或者Callable
接口的任务对象。 - 把创建完成的实现
Runnable
/Callable
接口的对象直接交给ExecutorService
执行:ExecutorService.execute(Runnable command)
或者也可以提交给ExecutorService
执行:ExecutorService.submit(Runnable task)
或ExecutorService.submit(Callable <T> task)
。 - 如果执行
ExecutorService.submit(...)
,ExecutorService
将返回一个实现Future
接口的对象(我们刚刚也提到过了执行execute()
方法和submit()
方法的区别,submit()
会返回一个FutureTask
对象)。由于FutureTask
实现了Runnable
,我们也可以创建FutureTask
,然后直接交给ExecutorService
执行。 - 最后,主线程可以执行
FutureTask.get()
方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)
来取消此任务的执行。
ExecutorService简介
创建 ExecutorService
ExecutorService executorService0 = Executors.newSingleThreadExecutor();
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { final int index = i; singleThreadExecutor.execute(new Runnable() { @Override public void run() { try { System.out.println(index); Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); }
结果依次输出,相当于顺序执行各个任务。现行大多数GUI程序都是单线程的。Android中单线程可用于数据库操作,文件操作,应用批量安装,应用批量删除等不适合并发但可能IO阻塞性及影响UI线程响应的操作。
ExecutorService executorService1 = Executors.newCachedThreadPool();
创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线 程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int index = i; try { Thread.sleep(index * 1000); } catch (InterruptedException e) { e.printStackTrace(); } cachedThreadPool.execute(new Runnable() { @Override public void run() { System.out.println(index); } }); }
线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
ExecutorService executorService2 = Executors.newFixedThreadPool(10);
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { final int index = i; fixedThreadPool.execute(new Runnable() { @Override public void run() { try { System.out.println(index); Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); }
因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()。可参考PreloadDataCache。
ExecutorService executorService3 = Executors.newScheduledThreadPool(10);
创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
延迟执行:ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.schedule(new Runnable() { @Override public void run() { System.out.println("delay 3 seconds"); } }, 3, TimeUnit.SECONDS);
表示延迟3秒执行。
定期执行:
scheduledThreadPool.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println("delay 1 seconds, and excute every 3 seconds"); } }, 1, 3, TimeUnit.SECONDS);
表示延迟1秒后每3秒执行一次。
使用ExecutorService
execute(Runnable)
submit(Runnable)
submit(Callable)
invokeAny()
invokeAll()
execute(Runnable)
方法
execute(Runnable)
接收一个java.lang.Runnable
对象作为参数,并且以异步的方式执行它。ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(new Runnable() { public void run() { System.out.println("Asynchronous task"); } }); executorService.shutdown();
使用这种方式没有办法获取执行 Runnable 之后的结果,如果你希望获取运行之后的返回值,就必须使用 接收 Callable 参数的 execute() 方法,后者将会在下文中提到。
submit(Runnable)
方法
submit(Runnable)
同样接收一个Runnable 的实现作为参数,但是会返回一个Future
对象。这个Future
对象可以用于判断 Runnable是否结束执行。ExecutorService executorService = Executors.newSingleThreadExecutor(); Future future = executorService.submit(new Runnable() { public void run() { System.out.println("Asynchronous task"); } }); //如果任务结束执行则返回 null try{ System.out.println("future.get()=" + future.get()); }catch(InterruptedException e){ e.printStackTrace(); }catch(ExecutionException e){ e.printStackTrace(); } executorService.shutdown();
submit(Callable)
方法
submit(Callable)
和方法submit(Runnable)
比较类似,但是区别则在于它们接收不同的参数类型。Callable
的实例与Runnable
的实例很类似,但是Callable
的call()
方法可以返回结果。方法Runnable.run()
则不能返回结果。Callable
的返回值可以从方法submit(Callable)
返回的Future
对象中获取。ExecutorService executorService = Executors.newSingleThreadExecutor(); Future future = executorService.submit(new Callable<Integer>() { public Integer call() { System.out.println("Asynchronous task"); return 100; } }); //如果任务结束执行则返回100 try{ System.out.println("future.get()=" + future.get()); }catch(InterruptedException e){ e.printStackTrace(); }catch(ExecutionException e){ e.printStackTrace(); } executorService.shutdown();
invokeAny()
方法
invokeAny()
接收一个包含Callable
对象的集合作为参数,调用该方法不会返回 Future 对象,而是返回集合中某个Callable
对象的结果,而且无法保证调用之后返回的结果是哪个Callable
,只知道它是这些Callable
中一个执行结束的Callable
对象。 如果一个任务运行完毕或者抛出异常,方法会取消其它的 Callable 的执行。Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 1"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 2"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 3"; } }); String result = ""; try { result = executorService.invokeAny(callables); } catch (InterruptedException e) { e.printStackTrace(); } catch(ExecutionException e){ e.printStackTrace(); } System.out.println("result = " + result); executorService.shutdown();
随机返回一个完成的任务,从Task 1到3之中随机返回一个。
invokeAll()
方法
invokeAll()
会调用存在于参数集合中的所有 Callable 对象,并且返回一个包含 Future 对象的集合,你可以通过这个返回的集合来管理每个 Callable 的执行结果。 需要注意的是,任务有可能因为异常而导致运行结束,所以它可能并不是真的成功运行了。但是我们没有办法通过 Future 对象来了解到这个差异。List<Future<String>> futures = executorService.invokeAll(callables); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 1"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 2"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 3"; } }); for (Future<String> afuture : futures) { try { System.out.println("future.get = " + afuture.get()); } catch (ExecutionException e) { e.printStackTrace(); } }
关闭ExecutorService
一般情况下,ExecutorService 并不会自动关闭,即使所有任务都执行完毕,或者没有要处理的任务,也不会自动销毁 ExecutorService 。它会一直出于等待状态,等待我们给它分配新的工作。
这种机制,在某些情况下是非常有用的,比如,,如果应用程序需要处理不定期出现的任务,或者在编译时不知道这些任务的数量。
但另一方面,这也带来了副作用:即使应用程序可能已经到达它的终点,但并不会被停止,因为等待的 ExecutorService 将导致 JVM 继续运行。这样,我们就需要主动关闭 ExecutorService。
要正确的关闭 ExecutorService,可以调用实例的 shutdown()
或 shutdownNow()
方法。
shutdown()
方法:executorService.shutdown();
shutdown()
方法并不会立即销毁 ExecutorService 实例,而是首先让 ExecutorService 停止接受新任务,并在所有正在运行的线程完成当前工作后关闭。shutdownNow()
方法:List<Runnable> notExecutedTasks = executorService.shutDownNow();
shutdownNow()
方法会尝试立即销毁 ExecutorService 实例,所以并不能保证所有正在运行的线程将同时停止。该方法会返回等待处理的任务列表,由开发人员自行决定如何处理这些任务。
因为提供了两个方法,因此关闭 ExecutorService 实例的最佳实战 ( 也是 Oracle 所推荐的 )就是同时使用这两种方法并结合 awaitTermination()
方法。
使用这种方式,ExecutorService 首先停止执行新任务,等待指定的时间段完成所有任务。如果该时间到期,则立即停止执行。
executorService.shutdown();
try {
if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
Future接口
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
Future类位于java.util.concurrent
包下,它是一个接口:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
cancel
方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning
表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning
为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning
设置为true,则返回true,若mayInterruptIfRunning
设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning
为true还是false,肯定返回true。isCancelled
方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。isDone
方法表示任务是否已经完成,若任务完成,则返回true;get()
方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;get(long timeout, TimeUnit unit)
用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
也就是说Future提供了三种功能:
- 判断任务是否完成;
- 能够中断任务;
- 能够获取任务执行结果。
因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask
。
FutureTask
我们先来看一下FutureTask
的实现:
public class FutureTask<V> implements RunnableFuture<V>
FutureTask
类实现了RunnableFuture
接口,我们看一下RunnableFuture
接口的实现:
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
可以看出RunnableFuture
继承了Runnable
接口和Future
接口,而FutureTask
实现了RunnableFuture
接口。所以它既可以作为Runnable
被线程执行,又可以作为Future
得到Callable
的返回值。
FutureTask
提供了2个构造器:
public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}
事实上,FutureTask是Future接口的一个唯一实现类。
使用Callable+FutureTask:
public class Test {
public static void main(String[] args) {
//第一种方式
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
executor.submit(futureTask);
executor.shutdown();
//第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread
/*Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
Thread thread = new Thread(futureTask);
thread.start();*/
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("主线程在执行任务");
try {
System.out.println("task运行结果"+futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("所有任务执行完毕");
}
}
class Task implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println("子线程在进行计算");
Thread.sleep(3000);
int sum = 0;
for(int i=0;i<100;i++)
sum += i;
return sum;
}
}
参考资料:
线程池学习总结