1. 程式人生 > >大資料(十):MapTask工作機制與Shuffle機制(partitioner輸出分割槽、WritableComparable排序)

大資料(十):MapTask工作機制與Shuffle機制(partitioner輸出分割槽、WritableComparable排序)

一、MapTask工作機制

  1. Read階段:MapTask通過使用者編寫的RecordReader,從輸入InputSplit中解析出一個個key/value

  2. Map階段:該節點主要是將解析出的key/value交給使用者編寫map()函式處理,併產生一系列新的key/value。

  3. Collect收集階段:在使用者編寫map()函式中,當資料處理完成後,一般會呼叫OutputCollection.collect()輸出結果。在該函式內部,它會將生成的key/value分割槽(呼叫Partitioner),並寫入一個環形記憶體緩衝區中。

  4. Spill階段:即“溢寫”,當環形緩衝區滿後,MapReduce會將資料寫入本地磁碟上,生成一個臨時檔案。需要注意的是,將資料寫入本地磁碟之前,先要對資料進行一次本地排序,並在必要時對資料進行合併、壓縮等操作。

    1. 溢寫階段詳情:

      1. 利用快速排序演算法對快取區內的資料進行排序,排序方式是,先按照分割槽編號partition進行排序,然後按照key進行排序。這樣,經過排序後,資料以分割槽為單位聚集在一起,且同一分割槽內所有資料按照key有序。

      2. 按照分割槽編號由小到大依次將每個分割槽中的資料寫入任務工作目錄下的臨時檔案output/spillN.out(N表示當前溢寫次數)中。如果使用者設定Combiner,則寫入檔案之前,對每個分割槽中的資料進行一次聚集操作。

      3. 將分割槽資料的元資訊寫到記憶體索引資料結構SpillRecord中,其中每個分割槽的元資訊包括在臨時檔案中的偏移量、壓縮前資料大小和壓縮後資料大小。如果當前記憶體索引大小超過1MB,則將記憶體索引寫到檔案output/spillN.out.index中。

  5. Combine階段:當所有資料處理完成後,MapTask對所有臨時檔案進行一次合併,以確保最終只會生成一個數據檔案。

        當所有資料處理完後,MapTask會將所有臨時檔案合併成一個大檔案,並儲存到檔案output/file.out中,同時生成相應的索引檔案output/file.out.index。

        在進行檔案合併過程中,MapTask以分割槽為單位進行合併。對於某個分割槽,它將採用多輪遞迴合併的方式。每輪合併io.sort.factor(預設100)個檔案,並將產生的檔案重新加入待合併列表中,對檔案排序後,重複以上過程,直到最終得到一個大檔案。

        讓一個MapTask最終只生成一個數據檔案,可避免同時開啟大量檔案和同時讀取大量小檔案產生的隨機讀取帶來的開銷。

二、Shuffle機制

        MapReduce確保每個reducer的輸入都是按鍵排序的。系統執行排序的過程(即將map輸出作為輸入傳給reducer)稱為shuffle。

三、Partition分割槽

1、預設partition分割槽

public class HashPartitioner<K,V> extends Partition<K,V>{
    public int getPartition(K key,V value,int numReduceTasks){
        retuern (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

2、自定義Partitioner步驟

  1. 自定義類繼承Partitioner,重寫getPartition()方法

  2. 在job驅動中,設定自定義Partitioner

    1. job.setPartitionerClass(CustomPartitioner.class);

  3. 自定義Partition後,要根據自定義Partitioner的邏輯設定相應數量的ReduceTask

    1. job.setNumReduceTasks(5)

3、注意

  1. 如果reduceTask的數量>getPartition的結果數,則會多產生幾個空的輸出檔案part-r-000xx

  2. 如果1<reduceTask的數量<getPartition的結果數,則有一部分分割槽資料無處安放,會丟擲異常

  3. 如果reduceTask的數量=1,則不管mapTask端輸出多少個分割槽檔案,最終結果都交給這一個reduceTask,最終也就只會產生一個結果檔案part-r-00000

4、自定義Partitioner例項:將統計結果按照手機歸屬地不同省份輸出到不同檔案

       1、根據手機號的前三位判斷省份,如:139******31是江蘇的,再統計出手機使用的流量總和

       2、準備資料新建txt檔案,每行資料格式:id 手機號 上行流量 下行流量 ip

       3、分析

    1. MapReduce中會將map輸入的kv對,按照相同的key分組,然後分發給不同的reducetask。預設的分發規則為:根據key的hashcode%reducetask數來分發

    2. 如果要按照我們自己的需求進行分組,則需要改寫資料分發(分組)元件Partitioner,自定義一個CustomPartitioner繼承抽象類Partitioner

    3. 在job驅動中,設定自定義的partitioner

    4. 最終輸出結果  手機號 上行流量 下行流量 總流量

       4、編寫bean,flowbean將會作為map的value,而手機號會作為key

public class FlowBean implements Writable {
    /**
    * 上行流量
    */
    private long upFlow;

    /**
    * 下行流量
    */
    private long downFlow;
    /**
    * 總流量
    */
    private long sumFlow;

    /**
    * 必須要有空參構造,為了後續反射用
    */
    public FlowBean() {
        super();
    }

    public FlowBean(long upFlow, long downFlow) {
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    /**
    * 序列化方法
    */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    /**
    * 反序列化方法
    * 注意:序列化順序和反序列化順序必須一致
    */
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();    

    @Override
    public String toString() {
       return upFlow + "\t" + downFlow + "\t" + sumFlow;

    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
}

       5、編寫Mapper

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    FlowBean v = new FlowBean();
    Text k = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1 獲取一行
        String line = value.toString();

        // 2 切割
        String[] fields = line.split("\t");

        // 3 封裝物件
        // 手機號
        String phoneNum = fields[1];

        // 上行流量
        long upFlow = Long.parseLong(fields[fields.length - 3]);
        // 下行流量
        long downFlow = Long.parseLong(fields[fields.length - 2]);

        v.set(upFlow, downFlow);
        k.set(phoneNum);

        // 4 寫出資料
        context.write(k, v);
    }
}

       6、編寫Reducer

public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
    long sumUpFlow = 0;
    long sumDownFlow = 0;

    // 1 累加求和
    for (FlowBean flowBean : values) {
        sumUpFlow += flowBean.getUpFlow();
        sumDownFlow += flowBean.getDownFlow();
    }

    FlowBean flowBean = new FlowBean(sumUpFlow, sumDownFlow);

    // 2 輸出
    context.write(key, flowBean);
}

       7、編寫partitioner

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        // 1 獲取手機號碼
        String preNum = key.toString().substring(0, 3);
        int partition = 4;

        if ("136".equals(preNum)) {
            partition = 0;
        } else if ("137".equals(preNum)) {
            partition = 1;
        } else if ("138".equals(preNum)) {
            partition = 2;
        } else if ("139".equals(preNum)) {
            partition = 3;
        }
        return partition;
    }
}

       8、編寫Driver

public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1 獲取job物件
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 設定jar包路徑
        job.setJarByClass(FlowDriver.class);

        // 3 管理mapper和reducer類
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        // 4 設定mapper輸出的kv型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 5 設定最終輸出kv型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 設定分割槽
        job.setPartitionerClass(ProvincePartitioner.class);
        /*
        NumReduceTasks是生成檔案的個數 最好等於分類的個數
        等於1則沒有效果
        大於1小於分類個數則會報錯
        大約分類個數則會出現空檔案
        */
        job.setNumReduceTasks(5);
        // 6 設定輸入輸出路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 提交
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

       9、配置pargram arguments,需要處理的檔案所在資料夾和處理後輸出的資料夾(這個資料夾不可存在)

       10、執行程式檢視結果

四、WritableComparable排序

        排序是MapReduce框架中最重要的操作之一。MapTask和ReducerTask均會對資料(按照key)進行排序。該操作屬於Hadoop的預設行為。任何應用程式中的資料均會被排序,而不管邏輯上是否需要。預設排序是按照字典順序排序,且實現該排序的方法是快速排序。

        對於MapTask,它會將處理的結果暫時存放到一個緩衝區,當緩衝區使用率達到一定閾值後,在對緩衝區中的資料進行一次排序,並將這些有序資料寫到磁碟上,而當資料處理完畢後,它會對磁碟上所有檔案進行一次,合併以將這些檔案合併成一個大的有序檔案。

        對於ReduceTask,它從每一個MapTask上遠端拷貝相應的資料檔案,如果檔案大小超過閾值,則放在磁碟上,否則放到記憶體中。如果磁碟上檔案數目達到閾值,則進行一次合併以生成一個更大檔案;如果記憶體中檔案大小或者數目超過一定閾值,則進行一次合併後將資料寫到磁碟上,當所有資料拷貝完畢後,ReduceTask統一對記憶體和磁碟上的所有資料進行一次合併。

1、排序的分類:

  1. 部分排序:MapReduce根據輸入記錄的鍵對資料集排序。保證輸出的每個檔案內部排序。

  2. 全排序:首先建立一系列排好序的檔案;其次,串聯這些檔案;最後,生成一個全域性排序的檔案。主要思路是使用一個分割槽來描述輸出的全域性排序

  3. 輔助排序(GroupingComparatorf分割槽):MapReduce框架在記錄到達reducer之前按鍵對記錄排序,但鍵所對相應的值並沒有被排序。甚至在不同的執行輪次中,這些值的排序也不固定,因為它們來自不同的map任務且這些map任務在不同輪次中完成時間各不相同。一般來說,大多數MapReduce程式會避免讓reducer函式依賴於值的排序。但是,有時也需要通過特定的方法對鍵進行排序和分組等以實現對值的排序。

  4. 二次排序:在自定義排序過程中,如果compareTo中的判斷條件為兩個即為二次排序。

2、自定義排序WritableComparable

       bean物件實現WritableComparable介面重寫compareTo方法,就可以實現排序

3、自定義排序例項:根據上面例項產生的結果再次對總流量進行排序

       1.編寫bean

public class FlowBean implements WritableComparable<FlowBean>{
    /**
    * 上行流量
    */
    private long upFlow;
    /**
    * 下行流量
    */
    private long downFlow;
    /**
    * 總流量
    */
    private long sumFlow;

    /**
    * 必須要有空參構造,為了後續反射用
    */
    public FlowBean() {
        super();
    }

    public FlowBean(long upFlow, long downFlow) {
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public void set(long upFlow, long downFlow){
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    /**
    * 序列化方法
    */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    /**
    * 反序列化方
    * 注意:序列化順序和反序列化順序必須一致
    */
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow ;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    @Override
    public int compareTo(FlowBean o) {
        return (int) (this.sumFlow - o.getSumFlow());
    }
}

       2.編寫Mapper

public class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
    FlowBean k = new FlowBean();
    Text v = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1 獲取一行
        String line = value.toString();
        // 2 切割
        String[] fields = line.split("\t");
        // 3 封裝物件
        long upFlow = Long.parseLong(fields[1]);
        long downFlow = Long.parseLong(fields[2]);
        k.set(upFlow, downFlow);
        v.set(fields[0]);
        // 4 寫出
        context.write(k, v);
    }
}

       3.編寫Reducer

public class FlowSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context)    throws IOException, InterruptedException {
        context.write(values.iterator().next(), key);
    }
}

       4.編寫Driver

public class FlowSortDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException,InterruptedException {
        // 1 獲取job物件
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 設定jar包路徑
        job.setJarByClass(FlowSortDriver.class);

        // 3 管理mapper和reducer類
        job.setMapperClass(FlowSortMapper.class);
        job.setReducerClass(FlowSortReducer.class);
    
        // 4 設定mapper輸出的kv型別
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        // 5 設定最終輸出kv型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 6 設定輸入輸出路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 提交
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

相關推薦

資料MapTask工作機制Shuffle機制partitioner輸出分割槽WritableComparable排序

一、MapTask工作機制 Read階段:MapTask通過使用者編寫的RecordReader,從輸入InputSplit中解析出一個個key/value Map階段:該節點主要是將解析出的key/value交給使用者編寫map()函式處理,併產生一系列

資料自定義OutputFormatReduceJoin合併資料傾斜

一、OutputFormat介面         OutputFormat是MapReduce輸出的基類,所有實現MapReduce輸出都實現了OutputFormat介面。 1.文字輸出TextOutPutFormat  &n

資料開發面試部分正常工作的Hadoop叢集中Hadoop都分別需要啟動哪些程序,它們的作用分別是什麼資料開發面試

啟動的程序: namenode socondarynamenode datanode ResourceManager(JobTracker)JobTracker NodeManager(TaskTracker) DFSZKFailoverController J

深度學習框架Keras學習系列線性代數基礎numpy使用Linear Algebra Basis and Numpy

又開一個新坑~~ 因為確實很有必要好好地趁著這個熱潮來研究一下深度學習,畢竟現在深度學習因為其效果突出,熱潮保持高漲不退,上面的政策方面現在也在向人工智慧領域傾斜,但是也有無數一知半解的人跟風吹捧,於是希望藉此教程,讓自己和讀者一起藉助keras,從上到下逐漸

資料-Hadoop生態(17)-MapReduce框架原理-MapReduce流程,Shuffle機制,Partition分割槽

 MapReduce工作流程 1.準備待處理檔案 2.job提交前生成一個處理規劃 3.將切片資訊job.split,配置資訊job.xml和我們自己寫的jar包交給yarn 4.yarn根據切片規劃計算出MapTask的數量 (以一個MapTask為例) 5.Maptask呼叫

資料多job串聯ReduceTask工作機制

一、多job串聯例項(倒索引排序) 1.需求 查詢每個單詞分別在每個檔案中出現的個數   預期第一次輸出(表示單詞分別在個個檔案中出現的次數) apple--a.txt 3 apple--b.txt 1 apple--c.txt 1 grape--a.txt

資料NameNode工作機制

一、NameNode和Secondary NameNode工作機制 左側為NameNode  右側為Secondary NameNode 1.第一階段:namenode啟動 第一次啟動namenode格式化後,建立fsimage和edits檔案,如果不是第一

貨拉拉資料總監劉幸資料&智慧化體系的建立 | 2018FMI人工智慧資料高峰論壇深圳站

10月28日FMI 2018人工智慧與大資料高峰論壇深圳場圓滿落幕,貨拉拉大資料總監劉幸以大資料&智慧化體系的建立為主題進行了精彩的分享。   貨拉拉大資料總監 劉幸 以下是劉幸演講內容,飛馬網根據現場速記進行了不改變原意的編輯(有刪減):  

拉開變革序幕分散式計算框架資料

因為對大資料處理的需求,使得我們不斷擴充套件計算能力,叢集計算的要求導致分散式計算框架的誕生,用廉價的叢集計算資源在短短的時間內完成以往數週甚至數月的執行等待,有人說誰掌握了龐大的資料,誰就主導了需求。雖然在十幾年間,通過過去幾十年的積澱,誕生了mapreduc

資料生態系統基礎Apache Kafka基礎介紹和安裝

http://blog.csdn.net/zhy_yz/article/details/5905637 一、 Apache kafka基礎介紹           1、kafka 是什麼?              首先一句話: Apache Kaf

資料開發面試部分Hadoop 中 job 和 task 之間的區別是什麼資料開發面試

JobTracker 是一個 master 服務,軟體啟動之後 JobTracker 接收 Job,負責排程 Job的每一個子任務, task 運行於 TaskTracker 上,並監控它們,如果發現有失敗的 task 就重新執行它。一般情況應該把 JobTracker 部署

學習資料第五天最小二乘法的Python實現

1.numpy.random.normal numpy.random.normal numpy.random.normal(loc=0.0, scale=1.0, size=None) Draw random samples from a normal (Gaussi

資料生態系統基礎 HBASEHBASE 介紹及安裝配置

一、介紹        Apache HBase是Hadoop資料庫,一個分散式的、可伸縮的大型資料儲存。        當您需要隨機的、實時的讀/寫訪問您的大資料時,請使用Apache HBase。這個專案的目標是承載非常大的表——數十億行X百萬列的列——執行在在商用硬體

資料開發面試部分對yarn的理解資料開發面試

YARN是Hadoop2.0版本引進的資源管理系統,直接從MR1演化而來。  核心思想:將MR1中的JobTracker的資源管理和作業排程兩個功能分開,分別由ResourceManager和ApplicationMaster程序實現。 ResourceManager:負

C#資料結構算法系列逆波蘭計算器——逆波蘭表示式字尾表示式

1.介紹 字尾表示式又稱逆波蘭表示式,與字首表示式相似,只是運算子位於運算元之後 2.舉例說明 (3+4)*5-6對應的字尾表示式就是3 4 +5 * 6 - 3.示例 輸入一個逆波蘭表示式(字尾表示式),使用棧(Stack),計算其結果 思路分析: 從左至右掃描表示式,遇到數字時,將數字壓入堆疊,遇到運算

Android開發系列Notification的功能使用方法

font _id when ice extends 開發 content androi mark 關於消息的提示有兩種:一種是Toast,一種就是Notification。前者維持的時間比較短暫,後者維持的時間比較長。 並且我們尋常手機的應用比方網易、貼吧等等都有非常多

Android項目實戰Android集成Unity3D項目圖文詳解

jar包沖突 scree pmap module 項目實戰 技術 詳細 應用端 原來 原文:Android項目實戰(三十九):Android集成Unity3D項目(圖文詳解)  需求:   Unity3D 一般用於做遊戲 而且是跨平臺的。原本設計是Android 應用端A

Redis(二一)Redis效能問題排查解決手冊

效能相關的資料指標 通過Redis-cli命令列介面訪問到Redis伺服器,然後使用info命令獲取所有與Redis服務相關的資訊。通過這些資訊來分析文章後面提到的一些效能指標。 info命令輸出的資料可分為10個類別,分別是: server clients memory persis

通證經濟大局觀私有制的崛起家庭的出現

私有制的崛起 在原始社會後期,由於金屬工具的出現、馴化技術的提高、刀耕火種農業的發明,使得部落內部開始出現剩餘物品,且越來越多。 這時候問題出現了,這些多出來的物品歸誰呢? 我們知道,在漫長的石器時代,人類都是處於物資匱乏的境地。東西不多,所以只能由部落統一分配,以保證有限的資源能讓大家一起活下

Spark2.2+ES6.4.2ES API之ndex的create建立index時設定setting,並建立index後根據avro模板動態設定index的mapping/update/delete/open/close

要想通過ES API對es的操作,必須獲取到TransportClient物件,讓後根據TransportClient獲取到IndicesAdminClient物件後,方可以根據IndicesAdminClient物件提供的方法對ES的index進行操作:create index,update inde