چارچوب fork/join یک پیادهسازی برای ExecuterService ارایه داده است و هدف آن استفاده از تمام قدرت پردازشی موجود برای بالا بردن کارایی برنامه است. با استفاده از این چارچوب میتوان کار را به صورت بازگشتی به تکههای کوچکتر تقسیم کرد و آنها را به صورت موازی انجام داد.
این چارچوب مانند سایر پیادهسازیهای ExecuterService، کارها را بین thread های یک pool thread توزیع میکند؛ اما تفاوت آن با سایر پیادهسازیهای ExecuterService این است که از یک الگوریتم work-stealing (سرقت کار) استفاده میکند. در این الگوریتم thread هایی که کاری برای انجام ندارند، کارهای thread های مشغول را میدزدند و به عهده میگیرند.
برای استفاده از چارچوب fork/join باید ابتدا کدمان را به صورت بازگشتی بنویسیم (چیزی شبیه به شبه کد زیر):
if (my portion of the work is small enough)
do the work directly
else
split my work into two pieces
invoke the two pieces and wait for the results
کلاسی که این کد را در آن مینویسیم باید زیرکلاسِ RecursiveTask یا RecursiveAction باشد و برای اجرای عملیات، یک instance از آن را به متد invoke از کلاس ForkJoinPool پاس میدهیم و این متد را فراخوانی میکنیم.
مثال: یافتن بیشینه مقدار یک آرایه
در شبه کد زیر، به صورت بازگشتی آرایه به دو بخش چپ و راست تقسیم شده و بیشینه در هر بخش محاسبه میشود. تا وقتی که اندازه مسئله (آرایه) از آستانهای کمتر نشده باشد این تقسیم ادامه خواهد داشت. در صورتی که اندازه مسئله از آستانه کمتر شده باشد، محاسبات به صورت عادی (ترتیبی) انجام میشود.
Result solve(Problem problem) {
if (problem.size < SEQUENTIAL_THRESHOLD)
return solveSequentially(problem);
else {
Result left, right;
INVOKE-IN-PARALLEL {
left = solve(extractLeftHalf(problem));
right = solve(extractRightHalf(problem));
}
return combine(left, right);
}
}
در کد زیر الگوریتم فوق به کمک چارچوب fork/join پیادهسازی شده است:
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class MaximumFinder extends RecursiveTask<Integer> {
private static final int SEQUENTIAL_THRESHOLD = 5;
private final int[] data;
private final int start;
private final int end;
public MaximumFinder(int[] data, int start, int end) {
this.data = data;
this.start = start;
this.end = end;
}
public MaximumFinder(int[] data) {
this(data, 0, data.length);
}
@Override
protected Integer compute() {
final int length = end - start;
if (length < SEQUENTIAL_THRESHOLD) {
return computeDirectly();
}
final int split = length / 2;
final MaximumFinder left = new MaximumFinder(data, start, start + split);
left.fork();
final MaximumFinder right = new MaximumFinder(data, start + split, end);
return Math.max(right.compute(), left.join());
}
private Integer computeDirectly() {
System.out.println(Thread.currentThread() + " computing: " + start
+ " to " + end);
int max = Integer.MIN_VALUE;
for (int i = start; i < end; i++) {
if (data[i] > max) {
max = data[i];
}
}
return max;
}
public static void main(String[] args) {
// create a random data set
final int[] data = new int[1000];
final Random random = new Random();
for (int i = 0; i < data.length; i++) {
data[i] = random.nextInt(100);
}
// submit the task to the pool
final ForkJoinPool pool = new ForkJoinPool(4);
final MaximumFinder finder = new MaximumFinder(data);
System.out.println(pool.invoke(finder));
}
}