Fork/Join framework trong Java
Fork/Join framework là một implementation của ExecutorService interface, nó giúp bạn có thể tận dụng được lợi thế của bộ xử lý nhiều luồng. Fork/Join framework được thiết kế cho các công việc mà có thể chia nhỏ liên tục thành các phần nhỏ hơn (chia để trị).
Mục đích của việc này là sử dụng tất cả sức mạnh tính toán của hệ thống để cải thiện hiệu suất cho ứng dụng.
Fork/Join framework phân chia các tasks cho các worker threads trong một thread pool. Nó sử dụng thuật toán work-stealing algorithm. Các worker threads khi chạy xong có thể lấy tasks từ các threads khác khi chúng đang bận.
Fork/Join framework được xây dựng xung quanh ForkJoinPool class, là một mở rộng của AsbtractExecutorService class. ForkJoinPool hiện thực thuật toán work-stealing và có thể chạy các ForkJoinTask processes.
Cách sử dụng
Cơ bản thì ta sử dụng Chia để trị là cách tiếp cận cho các bài toán sử dụng Fork/Join framework: bài toán hoặc các tasks được chia nhỏ liên tục đến khi nào kích thước đủ nhỏ để tính toán dễ dàng hiệu quả bằng cách trực tiếp.
Đây là mã giả mô tả cho cách tiếp cận trên
if (my portion of the work is small enough)
do the work directly
else
split my work into two pieces
invoke the two pieces and wait for the results
Sau đó, ta wrap đoạn code có được ở trên trong một ForkJoinTask subclass, thường thì các lớp dẫn xuất của nó được sử dụng như RecursiveTask (có thể return kết quả) hoặc RecursiveAction.
Code Example:
Calculate sum of array.
public class SumParallel extends RecursiveAction {
public static final int MIN_PIECE = 500;
private final int[] array;
private final int low, high;
private int sum;
public SumParallel(int[] array, int low, int high) {
this.array = array;
this.low = low;
this.high = high;
}
@Override
protected void compute() {
if (this.low <= this.high - MIN_PIECE) {
for (int i = low; i <= high; i++) {
sum += array[i];
}
} else if (this.low > this.high) {
this.sum = 0;
} else {
int mid = (this.low + this.high) / 2;
SumParallel lowSum = new SumParallel(this.array, this.low, mid);
SumParallel highSum = new SumParallel(this.array, mid + 1, this.high);
invokeAll(lowSum, highSum);
this.sum = lowSum.sum + highSum.sum;
}
}
public static void main(String[] args) {
int ARRAY_SIZE = 1000000;
int[] array = IntStream.rangeClosed(1, ARRAY_SIZE).toArray();
// Sum with ForkJoin
SumParallel sumParallel = new SumParallel(array, 0, array.length - 1);
ForkJoinPool pool = new ForkJoinPool();
long startTime = System.currentTimeMillis();
pool.invoke(sumParallel);
long endTime = System.currentTimeMillis();
System.out.println("Sum parallel took " + (endTime - startTime) +
" milliseconds.");
System.out.println("Sum = " + sumParallel.sum);
// Sum without ForkJoin
int sum = 0;
startTime = System.currentTimeMillis();
for (int i : array) {
sum += i;
}
endTime = System.currentTimeMillis();
System.out.println("Sum took " + (endTime - startTime) +
" milliseconds.");
System.out.println("Sum = " + sum);
}
}
Máy của mình có 2 nhân 4 luồng, nên với MIN_PIECE = 500 và ARRAY_SIZE = 1000000 thì việc tính tổng sử dụng fork/join nhanh hơn. Tuy nhiên, với các số liệu khác thì có thể dẫn đến việc tính toán dùng fork/join chậm hơn. Nguyên nhân là do việc tính toán đồng thời có overhead.
Bên cạnh sử dụng fork/join framework để hiện thực các thuật toán mà các tasks có thể thực hiện đồng thời trên hệ thống có nhiều lõi, có một số tính năng nổi bật và hữu ích trong Java SE áp dụng fork/join framework. Ví dụ như java.util.Arrays class.parallelSort(). Trong Java SE 8 thì có thêm các methods trong java.util.streams package.
Nhận xét
Đăng nhận xét