本教程共分为三个部分
- 第一部分:线程(Thread)与执行体(Executors)
- 第二部分:同步(Synchronization)与锁(Locks)
- 第三部分:原子变量与ConcurrentMap
欢迎浏览Java8并发教程的第一部分.本教程致力于使用简单而易于理解的代码实例来教授你关于java8中并发编程一些知识。接下来你会学到如何使用线程,任务(tasks)以及执行体(executor)来是你的代码并行的运行起来。
Java 并发API于Java5首次加入,在后来发布的版本中不断迭代完善。本文中出现的大部分概念也适合java8以下的版本,不单单针对java8
线程(Threads)和Runnables
所有现代操作系统都通过进程与线程支持了并发操作。进程是一些程序的运行实例,他通常是独立运行互不干扰的。例如你打开一个java程序时操作系统就孵化出一个新的进程来运行你的程序,而你的程序时和其他程序并行执行的。在这些进程里面我们可以利用线程来并发的执行我们的代码,这样我们就可以最大限度的利用多核CUP的优势。
Java从 JDK1.0 就支持线程了。在启动一个新线程之前你首先要告诉这个线程要完成什么任务,这个要完成的任务通常被叫做 task 。这个任务可以通过实现 Runnable
接口,这是一个里面只定义了一个无参数函数 run()
的函数接口,下面是它的一个例子:
Runnable task = () -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
};
task.run();
Thread thread = new Thread(task);
thread.start();
System.out.println("Done!");
由于Runnable
是一个函数接口,所以我们可以使用Java 8 的Lamdda
表达式将当前执行线程的名称打印到控制台上。我们在启动新线程之前让runnable直接在主线程中执行了。
在你的控制台会输出类似下面的代码:
Hello main
Hello Thread-0
Done!
或者
Hello main
Done!
Hello Thread-0
之所以出现上面那种情况是因为在线程并发执行时,我们是无法预知runnable是在打印‘done’那条语句之前执行还是之后执行。这两个线程的执行时不可预知的,所以说要在大型程序中进行并发编程是一个很复杂的任事情。
我们可以让线程睡眠一段时间来模拟一个耗时操作,例如下面的例子
Runnable runnable = () -> {
try {
String name = Thread.currentThread().getName();
System.out.println("Foo " + name);
TimeUnit.SECONDS.sleep(1);
System.out.println("Bar " + name);
}
catch (InterruptedException e) {
e.printStackTrace();
}
};
Thread thread = new Thread(runnable);
thread.start();
当你执行上面的代码时,两句打印语句之间会相差1秒钟。TimeUnit
是一个操作时间的一个比较方便的枚举类。除此之外,你还可以使用 Thread.sleep(1000)
来达到同样的效果。
使用Thread 类非常的乏味而且易错。因而在2004年发布的 Java 5 中引入了 并发API. 这个API 位于 java.util.concurrent
包内,这个包里面包含了很多处理并发编程时非常用的类。自从并发API发布以来,每一个java版本发布时都对其进行加强,Java 8 也不例外。
那么让我们深入的研究一下并发API中最重要的部分–Executor services
Executors
并发 API 引入了 ExecutorService
来替代直接使用Threads编程。Executors 可以运行异步任务,它一般会维护这一个线程池,所以我们可以不去手动创建线程。线程池内部的线程可以被重复利用来执行任务,所以我们可以在我们程序的整个生命周期内只使用一个 executor 服务来处理无限多的并发任务。
下面我们使用 Executors 重写我们前面的例子
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName); }); // => Hello pool-1-thread-1
Executors
提供了很多工厂方法来创建各种Executor 服务实例。在这个例子中我们创建了一个只有一个线程的executor。
代码运行结果似乎会和使用 Thread 那个例子一样,但是一旦你运行程序就会发现不一样:这个程序用于不会停止!Executors必须手动停止,而不像线程那样执行完就自动停止了,如果你不手动停止Executors,它就会一直监听是否有新的任务要执行。
ExecutorService
有两个方法可以用来结束:shutdown()
与shutdownNow()
.这两个方法有少许不同,shutdown()
是等待当前正在运行的任务完成后再结束,而 shutdownNow()
中断所有正在运行的任务,立即结束。
下面是我认为比较优雅的结束executors的方法
try {
System.out.println("attempt to shutdown executor");
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
System.err.println("tasks interrupted");
}
finally {
if (!executor.isTerminated()) {
System.err.println("cancel non-finished tasks");
}
executor.shutdownNow();
System.out.println("shutdown finished");
}
上面的方法是在结束executor之前等待一段时间,在此时间段内程序有可能结束也有可能没有结束,只要过了这个时间如果executor仍然没有结束则立即结束此执行体。
Callables and Futures
除了Runnable
executors 还支持另一种taskCallable
. Callable 与 Runnable 一样,也是一个函数接口,但是她是有返回值的。下面的lambda表达式定义了一个callable,其在1秒后返回一个整数。
Callable<Integer> task = () -> { try { TimeUnit.SECONDS.sleep(1); return 666; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); } };
与Runnable一样,Callable也是可以提交给Executor去执行的。但是Callable是有返回值的,ExecutorService 的
submit()
方法是不会等待执行任务完成的,它是异步的,executor 服务不能直接返回Callable 的返回值。所以 executor 会返回一个Future
类型的结果A,callable的执行结果就存放在A里,然后当需要的时候就可以从A里面取出。ExecutorService executor = Executors.newFixedThreadPool(1); Future<Integer> future = executor.submit(task); System.out.println("future done? " + future.isDone()); Integer result = future.get(); System.out.println("future done? " + future.isDone()); System.out.print("result: " + result);
我们将上面沉睡1秒后返回
666
的那个task交给Executor 执行,结果存放在future
变量里。然后打印这个task是否执行完毕(由于我们的task是要沉睡1秒钟的,所以future.isDone()
肯定是false),接着我们调用future.get()
方法获取执行结果,这个方法是个阻塞方法,一直会阻塞当前线程到task执行完毕。程序执行结果如下future done? false future done? true result: 666
Futures 与Executors 服务底层联系非常紧密,如果在future没有完成的情况下就结束executor,future将引发异常
Timeouts
如上所述,调用
future.get()
方法会阻塞当前线程直到callback结束,但是如果callback永远不结束呢?那样就会造成我们的程序失去响应,对于这种情况我们可以设置超时来处理ExecutorService executor = Executors.newFixedThreadPool(1); Future<Integer> future = executor.submit(() -> { try { TimeUnit.SECONDS.sleep(2); return 666; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); } }); future.get(1, TimeUnit.SECONDS);
执行上面的代码会产生一个异常
TimeoutException
,类似于下面所示Exception in thread "main" java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205)
这是由于我们设置的callable需要至少2秒才能完成,而我们设置的超时只有1秒,所以必然会引发异常。
InvokeAll
Executors 调用
invokeAll()
方法可以批量调用callable。这个方法接受一个callable的集合然后返回一个future的列表。ExecutorService executor = Executors.newWorkStealingPool(); List<Callable<String>> callables = Arrays.asList( () -> "task1", () -> "task2", () -> "task3"); executor.invokeAll(callables) .stream() .map(future -> { try { return future.get(); } catch (Exception e) { throw new IllegalStateException(e); } }) .forEach(System.out::println);
在这个例子中,我们使用了Java 8 中
stream
功能,它可以以流式化的形式处理返回结果。上面代码的大意为:将invokeAll()
返回结果(Fruture类型集合)流化后通过map()
函数将future
映射成callback的执行结果,最后调用forEach()
函数打印到控制台。如果对Java 8 的这些特性不是很熟悉请参考 Java 8 StreamInvokeAny
另一个批量执行callable的方法是
invokeAny()
,它与invokeAll()
不同之处在于,它是只要加入它里面的callable其中有一个执行完成后就返回那个callable的结果Callable<String> callable(String result, long sleepSeconds) { return () -> { TimeUnit.SECONDS.sleep(sleepSeconds); return result; }; } ExecutorService executor = Executors.newWorkStealingPool(); List<Callable<String>> callables = Arrays.asList( callable("task1", 2), callable("task2", 1), callable("task3", 3)); String result = executor.invokeAny(callables); System.out.println(result); // => task2
上面的例子中我们创建了另一种Executor 服务
newWorkStealingPool()
,这是一个ForkJoinPool
类型的服务,这种线程池默认会根据CPU 的核数来创建线程,例如CPU 是4核的,那这个线程池默认容量为4.Scheduled Executors
我们已经学习了如何使用一个executor 执行任务,下面我们看一下如何定时执行,以及循环执行的问题。这会使用到ScheduledExecutorService
。
下面的代码演示了让一个任务3秒后开始执行
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);
TimeUnit.MILLISECONDS.sleep(1337);
long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);
scheduleAtFixedRate()
与scheduleWithFixedDelay()
两个函数可以达到使任务周期执行。第一个函数,使任务我按一定的周期触发执行,如下例子
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
scheduleAtFixedRate()
函数没有考虑任务执行所需要的时间,例如你的执行间隔为1秒,但是任务本身执行时间要2秒,那么你的线程池很快就会膨胀。
如果是上面的那种情况,我们应该考虑使用scheduleWithFixedDelay()
,这个函数的周期间隔是从上一个任务结束到下一个任务开始的时间。例如你设置的执行间隔我1秒,任务执行时间为2秒。那么实际执行间隔就是1+2=3秒
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("Scheduling: " + System.nanoTime());
}
catch (InterruptedException e) {
System.err.println("task interrupted");
}
};
executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);
如果你不能保证任务的执行时间小于你要设定的周期执行时间间隔,就应该使用这个函数。