【 ForkJoinTask分支任务 】
分之合并的设计思想主要是来自于CPU运行环境,ForkJoinTask是在JDK1.7之后追加到java之中的一个类库,
其主要的功能是进行资源窃取功能的实现。
❥ 但需要注意的是,在分支合并任务的处理结构中一定要注意以下的几点:
• 分支任务之中的数据的同步处理一定要有分支任务自己来完成,不要求进行额外的控制;
• 在进行分支处理操作的时候不要进行IO操作;
• 由于分支任务是捆绑一起执行的,如果出现了异常千万别抛出,会整体任务失败。
❥ 在进行分支任务的处理之中主要使用如下的几个核心类:
• ForkJoinTask : 进行分支合并任务的处理类;
• ForkJoinPool : 分支合并池。
【 分支任务类结构 】
【 有返回结果的任务:RecursiveTask 】
下面以一个简单的1—100累加来实现一个分支的处理任务,如果要进行1—100的累加,
可以将任务分为两个阶段:1—50累加,第二个是进行51—100的累加。
范例:实现分支处理
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class TestDemo {
public static void main(String[] args) throws Exception {
// 从0-100的累加处理操作
AddTask task = new AddTask(0, 100);
ForkJoinPool pool = new ForkJoinPool() ;
// 提交任务
Future<Integer> future = pool.submit(task);
System.out.println(future.get());
}
}
@SuppressWarnings("serial") // 相当于继承了ForkJoinTask父类
class AddTask extends RecursiveTask<Integer> {
private int start;
private int end;
// 传入计算的开始和结束的值
public AddTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override // 是进行数据的分支处理操作
protected Integer compute() {
// 保存求和的计算结果
int sum = 0;
if (end - start < 100) { // 开启了分支
for (int x = start; x <= end; x++) {
sum += x;
}
} else {
int middle = (start + end) / 2; // 中间值
// 做0 - 50累加
AddTask leftTask = new AddTask(start, middle);
// 做51 - 100累加
AddTask rightTask = new AddTask(middle + 1, end);
// 表示开启下一个分支计算,开启的是computer()
leftTask.fork();
// 表示开启下一个分支计算,开启的是computer()
rightTask.fork();
// 把两个的分支结果进行合并
sum = leftTask.join() + rightTask.join();
return sum;
}
return sum;
}
} // 输出结果:5050
整个的分支合并之中相当于就是一个线程池的扩展概念,在整体的计算之中,每一个分支都会产生一个新的线程对象进行计算,唯一的区别是,默认情况下的线程池是由用户自己来设置的线程对象,而ForkJoin是由任务类自己根据情况进行拆分处理。
【 没有返回值的任务:RecursiveAction 】
很多的时候进行分支处理的时候有可能是没有返回值的,所以可以修改一下任务的继承父类”RecursiveAction”。
范例:定义分支任务
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class MLDNTestDemo {
public static void main(String[] args) throws Exception {
CountSave save = new CountSave() ;
// 从0-100的累加处理操作
AddTask task = new AddTask(save,0, 100);
ForkJoinPool pool = new ForkJoinPool() ;
pool.submit(task);// 提交任务
// 当前的任务没有结束
while(!task.isDone()) {
TimeUnit.MILLISECONDS.sleep(100);
}
// 分支任务计算完成
if (task.isCompletedNormally()) {
System.out.println("计算完成了:" + save.getSum());
}
}
}
class CountSave { // 保存数据处理结果
private Lock lock = new ReentrantLock() ;
private int sum = 0 ; // 保存处理结果
public void add(int sum) {
this.lock.lock();
try {
this.sum += sum ;
} finally { this.lock.unlock(); }
}
public int getSum() {
return this.sum ;
}
}
// 相当于继承了ForkJoinTask父类
@SuppressWarnings("serial")
class AddTask extends RecursiveAction {
private int start;
private int end;
private CountSave save ;
// 传入计算的开始和结束的值
public AddTask(CountSave save,int start, int end) {
this.save = save ;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
// 开启了分支
if (end - start < 100) {
int sum = 0; // 保存求和的计算结果
for (int x = start; x <= end; x++) {
sum += x;
}
this.save.add(sum); // 保存计算结果
} else {
// 中间值
int middle = (start + end) / 2;
// 做0 - 50累加
AddTask leftTask = new AddTask(this.save,start, middle);
// 做51 - 100累加
AddTask rightTask = new AddTask(this.save,middle + 1, end);
// 并行执行的任务
super.invokeAll(leftTask, rightTask);
}
}
}
计算完成了:5050
实际上在使用ForkJoinTask处理的时候还可以取得使用的线程的消息。
public class TestDemo {
public static void main(String[] args) throws Exception {
CountSave save = new CountSave() ;
// 从0-100的累加处理操作
AddTask task = new AddTask(save,0, 100);
ForkJoinPool pool = new ForkJoinPool() ;
// 提交任务
pool.submit(task);
while(!task.isDone()) { // 当前的任务没有结束
System.out.println("活跃线程:" + pool.getActiveThreadCount()
+ "、最大的并发线程数:" + pool.getParallelism());
// TimeUnit.MILLISECONDS.sleep(100);
}
if (task.isCompletedNormally()) { // 分支任务计算完成
System.out.println("计算完成了:" + save.getSum());
}
}
}
活跃线程:1、最大的并发线程数:8
活跃线程:2、最大的并发线程数:8
活跃线程:2、最大的并发线程数:8
活跃线程:2、最大的并发线程数:8
活跃线程:2、最大的并发线程数:8
活跃线程:2、最大的并发线程数:8
活跃线程:1、最大的并发线程数:8
活跃线程:1、最大的并发线程数:8
活跃线程:1、最大的并发线程数:8
活跃线程:1、最大的并发线程数:8
计算完成了:5050
也就是说如果你现在要考虑所有底层的设计因素,那么就必须针对当前的主机硬件环境做出判断后才能够写出
良好的分支处理操作。