1. 程式人生 > >Hadoop MapReduce二次排序演算法與實現之演算法解析

Hadoop MapReduce二次排序演算法與實現之演算法解析

MapReduce二次排序的原理

    1.在Mapper階段,會通過inputFormat的getSplits來把資料集分割成split

public abstract class InputFormat<K, V> {
		public InputFormat() {}
		public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;

    2.inputFormat會提供RecordReader來讀取每一條Record,讀取之後會傳給map來接收和處理

public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;

    3.子啊Mapper階段最後通過partition對Mapper的計算記過進行分割槽,可以通過job的setPartitionerClass來自定義Partitioner    

public abstract class Partitioner<KEY, VALUE> {
		public Partitioner() {}

		public abstract int getPartition(KEY var1, VALUE var2, int var3);
	}

		public void setPartitionerClass(Class<? extends Partitioner> cls) throws IllegalStateException {
		    this.ensureState(Job.JobState.DEFINE);
		    this.conf.setClass("mapreduce.job.partitioner.class", cls, Partitioner.class);
		}

    4.在每個分割槽內可以呼叫Job的setSortComparatorClass來對分割槽內基於Key比較函式進行排序

public void setSortComparatorClass(Class<? extends RawComparator> cls) throws IllegalStateException {
		    this.ensureState(Job.JobState.DEFINE);
		    this.conf.setOutputKeyComparatorClass(cls);
		}

	public interface RawComparator<T> extends Comparator<T> {
		int compare(byte[] var1, int var2, int var3, byte[] var4, int var5, int var6);
	}

    如果沒有設定則會呼叫key 的compareTo方法     5. 在Reducer端會接收到所有Mapper端中屬於自己的資料,其實我們可以通過Job的setComparatorClass來對當前Reducer收到的所有的資料基於key比較函式進行排序,由於Reducer端每一個key對應的是valueList,因此需要Job的的setGroupingComparator來設定分組函式的類;   

  public void setCombinerKeyGroupingComparatorClass(Class<? extends RawComparator> cls) throws IllegalStateException {
            this.ensureState(Job.JobState.DEFINE);
            this.conf.setCombinerKeyGroupingComparator(cls);
        }

    6.最後呼叫Reducer對自己收到的資料進行最後的處理

    總結:     Mapper端:         設定partitioner控制partition的過程;         設定Comparator控制每個分割槽內的排序;     Reducer端:         設定Comparator控制reduce收到的所有資料的排序;         設定GroupingComparator對資料進行Group操作;