1. 程式人生 > >JAVA8 stream 中Spliterator的使用(二)

JAVA8 stream 中Spliterator的使用(二)

JAVA8 stream 中Spliterator的使用(一)給出了Spliterator的兩種使用,但是遺憾的是,程式碼並不正確。這篇說明下原因,並對Spliterator進行更深入的分析。

  1. 首先來看下sorted方法,將程式碼呼叫countNum處註釋掉,改為如下方法:
parallelStream.sorted().forEach(System.out::print);

程式碼將報錯。

Exception in thread "main" java.lang.NullPointerException
    at java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:52)
    at java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:1)

    at java.util.TimSort.binarySort(TimSort.java:296)
    at java.util.TimSort.sort(TimSort.java:239)
    at java.util.Arrays.parallelSort(Arrays.java:1113)
    at java.util.stream.SortedOps$OfRef.opEvaluateParallel(SortedOps.java:158)
    at java.util.stream.AbstractPipeline.opEvaluateParallelLazy(AbstractPipeline.java:704)
    at java.util.stream.AbstractPipeline.sourceSpliterator(AbstractPipeline.java:431)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at T2.main(T2.java:13)

跟進程式碼中看,在Comparators處打斷點進行除錯(預設jdk的source包是不能打斷點的,需要自己重新打包出含有除錯資訊的source包),在

java.util.stream.SortedOps.OfRef<T>類的T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);

方法中,解析出的flattenedData陣列中有null的元素。跟進helper.evaluate方法,如果是並行流,在

java.util.stream.Nodes.collect(PipelineHelper<P_OUT>, Spliterator<P_IN>, boolean, IntFunction<P_OUT[]>)

方法中會首先初始化一個全null的陣列。後面的邏輯是將資料中的元素根據Spliterator分割後的各元素插入到這個數組裡面。

問題就出在(一)中的第二個Spliterator修改了原始的char[]陣列的內容,因為在trySplit方法中,將char[]陣列中不是數字的char給忽略了,trySplit分割後的兩個Spliterator都沒有處理非數字的char,這樣會導致上文中全null陣列中在非數字index的位置沒有被元素填充,導致在sorted比較的時候報出空指標錯誤。

這裡暴露的問題說明,在編寫Spliterator的時候,不能修改stream的元素內容,這和stream不可修改性也是一脈相承的。
2. 修改程式碼,改成在trySplit方法中將非數字的char劃歸到分割後的第一個Spliterator中。
執行程式碼,報如下錯誤

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 1
    at java.util.stream.SortedOps$SizedRefSortingSink.accept(SortedOps.java:364)
    at NumCounterSpliterator2.tryAdvance(NumCounterSpliterator2.java:24)
    at java.util.Spliterator.forEachRemaining(Spliterator.java:326)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:1)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at NumCounterTest2.main(NumCounterTest2.java:32)

跟進原始碼

    java.util.stream.Nodes.SizedCollectorTask.OfRef.accept(P_OUT)中
            public void accept(P_OUT value) {
                if (index >= fence) {
                    throw new IndexOutOfBoundsException(Integer.toString(index));
                }
                array[index++] = value;
            }

發現stream的sorted實現時,會根據estimateSize返回的值賦值給fence,如果進行排序比較的元素的index值超過estimateSize返回值,就會丟擲異常。因此,在sorted使用過程中,estimateSize方法並不是一個可以隨意返回值的。

修改方式有兩種,另一種修改方式在後面將stream的characteristics引數時介紹。
1.將estimateSize方法改成準確的計算方式即可:
@Override
public long estimateSize() {
return end – currentChar + 1;
}

編碼過程中還發現一個小問題,用parallel並行stream的時候,遍歷元素是需要採用forEachOrdered而不是forEach方法,具體可以參見【1】

3. stream的characteristics引數介紹

基本引數使用,大家可以參見原始碼註釋,這裡介紹下一些注意的地方:
3.1 java.util.Spliterator.DISTINCT 屬性 表明該stream已經是distinct的了,因此,如果Spliterator含有此屬性,則在stream.distinct()呼叫的時候,是直接輸出該stream的,也就是distinct方法不進行通常意義上的唯一性過濾。
舉例:
將文末示例程式碼中的characteristics方法返回值,加入DISTINCT屬性。即:

    public int characteristics() {
        return ORDERED | SIZED | SUBSIZED | NONNULL | IMMUTABLE |DISTINCT;
    }

執行如下程式碼:

        String arr = "123123";
        System.out.println(arr);

        Spliterator<Character> spliterator = new NumCounterSpliterator2(0,arr.length()-1,arr.toCharArray(),true);
        // 傳入true表示是並行流
        Stream<Character> parallelStream = StreamSupport.stream(spliterator, true);

        parallelStream.distinct().forEach(System.out::print);

結果輸出:
123123
123123

可見並未做唯一性處理。如果去掉distinct屬性,則輸出結果:
123123
132

3.2 SORTED屬性

3.2.1. SORTED屬性和getComparator一起使用。如果該流已經按照預設字典序(natural order)排序好了,則返回null。
如果將Spliterator的getComparator返回null,並且設定SORTED屬性,則sorted()方法直接返回原stream流,不會做任何排序,原因和distinct相同,因為此流已經排好序了。

如果stream是非並行流,則返回直接和原stream流相同。如果是並行流,注意,因為並行流是會被trysplit處理的,每個分割後的Spliterator是sorted的。因為流屬性已經是sorted並且返回的getComparator是null,已經是排好序的了,因此每個子執行緒分割後的Spliterator直接輸出即可。

但是這裡注意,stream本質上底層是f/j程式碼,而f/j分割時候,是基於trySplit進行分割的。查看了java.util.stream.Streams.RangeIntSpliterator原始碼後發現,trySplit的分割是需要從[begin,end]返回一個以begin 為開始的Spliterator,例如分割為[begin,end1],將當前Spliterator的begin修改為end1+1,即分割為[end1+1,end].
原因應該是f/j的子執行緒fork和join有關,因為我們直到fork和join應該是相反序來寫的。例如:

f1.fork();
f2.fork();
f2.join();
f1.join();

因此,從f/j的多執行緒棧來說,f2 在 f1的上面,f2.join會導致f2先執行。return的[begin,end1]保證了先執行,
而f1的[end1+1,end] 任務後執行,這樣才是以encounter order順序執行的併發。
因此,程式碼中trysplit應該這麼寫:

    public Spliterator&lt;Character&gt; trySplit() {
        int i = currentChar;
        int currentCharOld = currentChar;
        for(;canSplit &amp;&amp; i &lt;= end; ++i){
            if(!Character.isDigit(str[i])){
                int splitBeforeEnd = end;
                canSplit = false;
                if(i + 1 &lt;= splitBeforeEnd){
                    currentChar = i + 1;
                    return new NumCounterSpliterator3(currentCharOld,i,str,true);
                }else{
                    return null;
                }
            }
        }
        canSplit = false;
        return null;
    }

而不能寫成

    @Override
    public Spliterator&lt;Character&gt; trySplit() {
        int i = currentChar;
        for(;canSplit &amp;&amp; i &lt;= end; ++i){
            if(!Character.isDigit(str[i])){
                int splitBeforeEnd = end;
                end = i ;
                canSplit = false;
                if(i + 1 &lt;= splitBeforeEnd){
                    return new NumCounterSpliterator2(i+1,splitBeforeEnd,str,true);
                }else{
                    return null;
                }
            }
        }

        canSplit = false;
        return null;
    }

。當然如上文說,如果不是並行流,不涉及trysplit方法,則getComparator返回null,直接就返回的原始流

3.2.2 3.2.1講了如果返回的是null的情況,那麼如果返回的不是null呢?很不幸,那設定了sorted和沒設定沒有任何區別,
即使你用hasCharacteristics(Spliterator.SORTED)方法,的確返回true。
為什麼?
看下原始碼:

java.util.stream.StreamOpFlag.fromCharacteristics(Spliterator)
        if ((characteristics &amp; Spliterator.SORTED) != 0 &amp;&amp; spliterator.getComparator() != null) {
            // Do not propagate the SORTED characteristic if it does not correspond
            // to a natural sort order
            return characteristics &amp; SPLITERATOR_CHARACTERISTICS_MASK &amp; ~Spliterator.SORTED;
        }
        else {
            return characteristics &amp; SPLITERATOR_CHARACTERISTICS_MASK;
        }

如果具有SORTED屬性,同時getComparator()返回的不為null,則& ~Spliterator.SORTED會將sorted屬性抹去,
則此stream不具有sorted屬性。不具有sorted屬性,則stream的sorted方法,就直接按照字典序排序了。

3.2.3 sorted()方法還有一個可以傳入Comparator的重寫方法,如果使用了傳入Comparator的sorted方法,則以這個
Comparator進行排序,和原stream是否具有sorted屬性無關。
原始碼如下:

java.util.stream.SortedOps.OfRef
        OfRef(AbstractPipeline&lt;?, T, ?&gt; upstream) {
            super(upstream, StreamShape.REFERENCE,
                  StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
            this.isNaturalSort = true;
            // Will throw CCE when we try to sort if T is not Comparable
            @SuppressWarnings(&quot;unchecked&quot;)
            Comparator&lt;? super T&gt; comp = (Comparator&lt;? super T&gt;) Comparator.naturalOrder();
            this.comparator = comp;
        }

        /**
         * Sort using the provided comparator.
         *
         * @param comparator The comparator to be used to evaluate ordering.
         */
        OfRef(AbstractPipeline&lt;?, T, ?&gt; upstream, Comparator&lt;? super T&gt; comparator) {
            super(upstream, StreamShape.REFERENCE,
                  StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
            this.isNaturalSort = false;
            this.comparator = Objects.requireNonNull(comparator);
        }

3.2.3 SIZED | SUBSIZED屬性

SIZED | SUBSIZED可以和estimateSize返回Long.MAX_VALUE一起配合使用。
如果stream沒有SIZED | SUBSIZED屬性,則可以將estimateSize返回為Long.MAX_VALUE.這說明這個stream
的estimateSize計算很複雜或本身就是一個infinite的steam流。這樣設定後,效能上會差一些,但是,不會對sorted
方法產生影響。2中提到的錯誤,也可也用這種方法處理。

最後附上全部程式碼:

NumCounterSpliterator3

import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;

public class NumCounterSpliterator3 implements Spliterator<Character> {

    private char[] str;
    private int currentChar = 0;
    private int end = Integer.MAX_VALUE;
    private boolean canSplit = true;

    public NumCounterSpliterator3(int currentChar,int end,char[] str,boolean canSplit) {
        this.str = str;
        this.currentChar = currentChar;
        this.canSplit = canSplit;
        this.end = end;
    }

    @Override
    public boolean tryAdvance(Consumer<? super Character> action) {
        action.accept( str[currentChar++] );
        return currentChar <= end;
    }

    @Override
    public Spliterator<Character> trySplit() {
        int i = currentChar;
        int currentCharOld = currentChar;
        for(;canSplit && i <= end; ++i){
            if(!Character.isDigit(str[i])){
                int splitBeforeEnd = end;
                canSplit = false;
                if(i + 1 &lt;= splitBeforeEnd){
                    currentChar = i + 1;
                    return new NumCounterSpliterator3(currentCharOld,i,str,true);
                }else{
                    return null;
                }
            }
        }

        canSplit = false;
        return null;
    }

    @Override
    public long estimateSize() {
        return end - currentChar + 1 /*Long.MAX_VALUE*/ ;
    }

    public Comparator<? super Character> getComparator() {
        return null;
    }

    @Override
    public int characteristics() {
        return ORDERED | SIZED | SUBSIZED | NONNULL | IMMUTABLE /*|SORTED*/;
    }
}

NumCounterTest2

import java.util.Spliterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class NumCounterTest2 {
    public static void main(String[] args) {
        String arr = "12%3 21sdas s34d dfsdz45   R3 jo34 sjkf8 3$1P 213ikflsd fdg55 kfd";
        System.out.println(arr);

        Spliterator<Character> spliterator = new NumCounterSpliterator3(0,arr.length()-1,arr.toCharArray(),true);
        // 傳入true表示是並行流
        Stream<Character> parallelStream = StreamSupport.stream(spliterator, true);
        System.out.println("parallel total: " + countNum(parallelStream));
        }

    private static int countNum(Stream<Character> stream){
        NumCounter numCounter = stream.reduce(new NumCounter(0, 0, false), NumCounter::accumulate, NumCounter::combine);
        return numCounter.getSum();
    }
}

【1】 java 8 parallelStream() with sorted()