1. 程式人生 > 實用技巧 >7、java8並行流和序列流

7、java8並行流和序列流

1、概念

並行流就是把一個內容分成多個數據塊,並用不同的執行緒分別處理每個資料塊的流

Java8中將並行進行了優化,我們可以很容易的對資料進行並行操作。

Stream API可以申明性的通過parallel()sequential()在並行流與順序流之間進行切換

2、Fork/Join框架

2.1、概念

Fork/Join框架:就是在必要的情況下,將一個大任務,進行拆分fork)成若干個小任務(拆到不可再拆時),再講一個個的小任務運算的結果進行join彙總

2.2、Fork/Join框架與傳統執行緒池的區別

採用“工作竊取”模式(work-stealing):

當執行新的任務時,它可以將其拆分為更小的任務執行,並將小任務加到執行緒佇列中,然後再從一個隨機執行緒的佇列中偷一個並把它放在自己的佇列中。

相對於一般的執行緒池實現:

  • fork/join框架的優勢體現在對其中包含的任務的處理方式上,在一般的執行緒池中,如果一個執行緒正在執行的任務由於某些原因無法繼續執行,那麼該執行緒會處於等待狀態

  • fork/join框架實現中,如果某個子問題由於等待另外一個子問題的完成而無法繼續執行,那麼處理該子問題的執行緒就會主動尋找其它尚未執行的子問題來執行,這種方式減少了執行緒等待時間,提高了效能。

2.3、程式碼示例

package java8;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * @Description: forkjoin框架demo
 * @Date: 2020/8/15 23:57
 * @Auther: zhaodi
 */
@Slf4j
public class ForkJoin extends RecursiveTask<Long> {
    // 任務劃分的細粒度
    private static Long MAX = 200l;
    // 子任務開始計算的值
    private Long startVal;
    // 子任務結束計算的值
    private Long endVal;
    public ForkJoin(Long startVal, Long endVal) {
        this.startVal = startVal;
        this.endVal = endVal;
    }
    @Override
    protected Long compute() {
        // 如果條件成立,說明這個任務所需要計算的數值分為足夠小了
        // 可以正式進行累加計算了
        if (endVal - startVal < MAX) {
            log.info("開始計算,startVal:{},endVal:{}", startVal, endVal);

            Long sum = 0l;
            for (long i = this.startVal; i <= this.endVal; i++) {
                sum += i;
            }
            return sum;
            // 否則再進行任務拆分,拆分成兩個任務
        } else {
            ForkJoin taskLeft = new ForkJoin(startVal, (startVal + endVal) / 2);
            taskLeft.fork();
            ForkJoin taskRight = new ForkJoin((startVal + endVal) / 2, endVal);
            taskRight.fork();
            return taskLeft.join() + taskRight.join();
        }
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Long> t = pool.submit(new ForkJoin(1l,1000000000L));
        Long result = t.get();
        System.out.println(result); //504194303981605715
    }
}

3、Java8並行流

Fork/Join在實際編碼中還是很少使用的,因為使用起來還是比較複雜的,通過上面的程式碼我們就可以發現,用起來的難度係數還是比較大的。

但是我們就沒有更好的辦法解決這個問題嗎?答案是肯定有的,在Java8中我們提供了parallel()底層就是用Fork/Join框架實現的

/**
 * @Description:
 * @Date: 2020/8/12 23:16
 * @Auther: zhaodi
 */
public class ParalleStreamDemo {

    public static void main(String[] args) {
        List<Student> studentList = new ArrayList<>();
        studentList.add(new Student(1, "趙迪", 23));
        studentList.add(new Student(3, "tom", 25));
        studentList.add(new Student(2, "tom", 27));
        studentList.add(new Student(2, "rose", 22));
        studentList.parallelStream().filter(x->x.getAge()>23).collect(Collectors.toList());
    }
}