hadoop 全面解讀自定義分區

分區概念

分區這個詞對很多同學來說並不陌生,比如Java很多中間件中,像kafka的分區,mysql的分區表等,分區存在的意義在於將數據按照業務規則進行合理的劃分,方便後續對各個分區數據高效處理

Hadoop分區

hadoop中的分區,是把不同數據輸出到不同reduceTask ,最終到輸出不同文件中

hadoop 默認分區規則

  • hash分區
  • 按照key的hashCode % reduceTask 數量 = 分區號
  • 默認reduceTask 數量為1,當然也可以在driver 端設置

以下是Partition 類中摘取出來的源碼,還是很容易懂的

hash分區代碼演示

下面是wordcount案例中的driver部分的代碼,默認情況下我們不做任何設置,最終輸出一個統計單詞個數的txt文件,如果我們在這段代碼中添加這樣一行

再次運行下面的程序後,會出現什麼結果呢?

public class DemoJobDriver {

    public static void main(String[] args) throws Exception {

        //1、獲取job
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        //2、設置jar路徑
        job.setJarByClass(DemoJobDriver.class);

        //3、關聯mapper 和 Reducer
        job.setMapperClass(DemoMapper.class);
        job.setReducerClass(DemoReducer.class);

        //4、設置 map輸出的 key/val 的類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5、設置最終輸出的key / val 類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6、設置最終的輸出路徑
        String inputPath = "F:\\網盤\\csv\\hello.txt";
        String outPath = "F:\\網盤\\csv\\wordcount\\hello_result.txt";

        //設置輸出文件為2個
        job.setNumReduceTasks(2);

        FileInputFormat.setInputPaths(job,new Path(inputPath));
        FileOutputFormat.setOutputPath(job,new Path(outPath));

        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

}

可以看到,最終輸出瞭2個統計結果文件,每個文件中的內容有所不同,這就是默認情況下,當reducer個數設置為多個時,會按照hash分區算法計算結果並輸出到不同分區對應的文件中去

自定義分區步驟

  • 自定義類繼承Partitioner
  • 重寫getPartition方法,並在此方法中根據業務規則控制不同的數據進入到不同分區
  • 在Job的驅動類中,設置自定義的Partitioner類
  • 自定義Partition後,要根據自定義的Partition邏輯設置相應數量的ReduceTask

業務需求

將下面文件中 的人物名稱按照姓氏,“馬”姓的放入第一個分區,“李”姓的放入第二個分區,其他的放到其他第三個分區中

自定義分區

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.io.Text;

public class MyPartioner extends Partitioner<Text, IntWritable> {

    @Override
    public int getPartition(Text text, IntWritable intWritable, int partion) {
        String key = text.toString();
        if(StringUtils.isNotEmpty(key.trim())){
            if(key.startsWith("馬")){
                partion = 0;
            }else if(key.startsWith("李")){
                partion = 1;
            }else {
                partion = 2;
            }
        }
        return partion;
    }
}

將自定義分區關聯到Driver類中,註意這裡的ReduceTasks個數和自定義的分區數量保持一致

job.setNumReduceTasks(3);
job.setPartitionerClass(MyPartioner.class);

下面運行Driver類,觀察最終的輸出結果,也是按照預期,將不同的姓氏數據輸出到瞭不同的文件中

關於自定義分區的總結

  • 如果ReduceTask的數量 > 自定義partion中的分區數量,則會多產生幾個空的輸出文件
  • 如果 1 < ReduceTask < 自定義partion中的分區數量,有一部分的數據處理過程中無法找到相應的分區文件存儲,會拋異常
  • 如果ReduceTask = 1 ,則不管自定義的partion中分區數量為多少個,最終結果都隻會交給這一個ReduceTask 處理,最終隻會產生一個結果文件
  • 分區號必須從0開始,逐一累加

到此這篇關於hadoop 全面解讀自定義分區的文章就介紹到這瞭,更多相關hadoop 自定義分區內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: