hadoop 切片機制分析與應用

前言

上面是一張MapReduce讀取一個文本數據的邏輯順序處理圖。我們知道,不管是本地運行還是集群模式下,最終以job的任務調度形式運行,主要分為兩個階段

  • Map階段,開啟MapTask處理數據的讀取
  • Reduce階段,開啟ReduceTask對數據做聚合

比如在wordcount案例中,一段文本數據,在map階段首先被解析,拆分成一個個的單詞,其實對hadoop來說,這項工作的完成,是由背後開啟的一個MapTask進行處理的,等job處理完成,看到在目標文件夾下,生成瞭對應的單詞統計結果

如果有多個單詞統計文本文件要處理呢?我們不妨改造下wordcount的job代碼,在一個目錄下放多個處理文件,看運行完畢的結果如何呢?

public static void main(String[] args) throws Exception {

        //1、獲取job
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        //2、設置jar路徑
        job.setJarByClass(DemoJobDriver.class);

        //3、關聯mapper 和 Reducer
        job.setMapperClass(DemoMapper.class);
        job.setReducerClass(DemoReducer.class);

        //4、設置 map輸出的 key/val 的類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5、設置最終輸出的key / val 類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6、設置最終的輸出路徑
        String inputPath = "F:\\網盤\\csv\\combines\\";
        String outPath = "F:\\網盤\\csv\\result";

        FileInputFormat.setInputPaths(job,new Path(inputPath));
        FileOutputFormat.setOutputPath(job,new Path(outPath));

        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }

運行完畢後,我們隨機截取幾張運行中的日志,通過閱讀關鍵信息,相信感興趣的同學能看出點什麼吧,總結下來

  • 如果一個job開啟後,檢查到一個目錄下包含多個待處理文件,將會開啟多個MapTask處理
  • 默認情況下,有多少個文件,就開啟多少個MapTask任務
  • ReduceTask處理完畢後,對結果做歸並

Hadoop任務並行化

使用hadoop或者其他大數據框架的一個很重要的原因在於,它們的底層設計能很好的支撐任務的並行化處理,也就是說,會充分利用服務器的配置將一個復雜的任務,或者單個Task處理起來很耗時的任務根據需要拆分成多個並行的子任務來處理,充分利用服務器性能,達到任務處理的最優,耗時最短,機器性能利用率最佳

hadoop同樣如此,提供瞭很多配置參數提供客戶端選擇,從而提升任務的處理性能

我們知道,hadoop的任務處理主要分為2個階段,Map和Reduce階段,默認情況下,結合上面的案例可以知道,將會根據文件個數默認開啟等量的MapTask去處理,但是我們設想這樣一個問題,現在的文件比較小,尚未超過默認的一個blocksize ,即128M,如果超出瞭怎麼辦?甚至說這個文件達到1個G怎麼辦?

於是得出如下結論:

MapTask的並行度決定Map階段的任務處理並發度,進而影響到整個Job的處理速度

這樣說來,當要處理的某個文件特別大的時候,通過設置MapTask的並行度是可以提升整個Map階段的處理速度的

思考:1G的數據,啟動8個MapTask,可以提高集群的並發處理能力。那麼1K的數據,也啟動8個MapTask,會提高集群性能嗎?MapTask並行任務是否越多越好呢?哪些因素影響瞭MapTask並行度?

MapTask並行度決定機制

  • 數據塊:Block是HDFS物理上的單位,即把數據分成一塊一塊的,數據塊是HDFS存儲數據單位數據切片:數據切片隻在邏輯上對輸入進行分片,並不是真的會在磁盤上將其切分成多個片進行存儲。
  • 數據切片是MapReduce程序計算輸入數據的單位,一個切片會對應啟動一個MapTask。

設想一段300MB大小的文件,假如按照100MB為一個切片的話,可以分為3個切片,這樣Map階段將會開啟3個MapTask來處理這個任務,但是默認情況下,Hadoop處理的一個文件塊即block的size大小為128Mb,那麼問題來瞭,假如在生產環境下,hadoop真正在分佈式環境下運行,任務往往分佈在不同的機器上運行的,如下圖所示

  • node1 ~ node3 可認為是集群中3個節點,用於處理MapTask數據
  • 默認情況下,每次處理一個任務默認的文件數據庫大小為128Mb
  • 300Mb的待處理文件,按照100Mb為一個切片,將會劃分為3個切片,3個切片將會開啟3個MapTask進行處理

以上按照直觀的理解,可以歸納出上面幾點,但仔細分析下,會發現另一個問題就是,切片規則是客戶端人為指定的規則,可以理解為一個賬本,上面記錄瞭工人幹活時的工時,從而為結工資的時候做考量

但是對於3個節點來說,它們可不這麼想瞭,因為它們是真正幹活即執行任務的,人傢管你是什麼切片規則呢?總不能按照你的100MB大小的切片規則將自己的默認的128MB的數據庫大小也改為100MB吧?這顯然是不可能的,那該怎麼辦呢?

既然你切片上的規則是100MB嘛,於是node1節點就按照你的規則來,我這個節點上就處理100MB大小的數據就完事瞭,還剩下28MB大小的數據怎麼辦?既然分成瞭3個切片,肯定要開啟3個MapTask瞭,node2節點也要處理一個任務瞭,但是不能隨意就處理數據啊,得先把node1節點上面那個28MB的未處理完畢的文件拷貝過來,再拼接出72MB大小的數據塊,湊夠100MB瞭再搞事

於是,如果在真正的分佈式環境下,這樣就存在一個數據文件的跨節點拷貝問題,這很顯然會帶來一部分的網絡開銷,如果數據文件較大話,這個性能損耗就很值得考慮瞭

按照以上理解,我們可總結出如下經驗:

  • 一個Job的Map階段的並行度由客戶端在提交Job時候的切片數量決定
  • 每一個切片將會被分配一個MapTask進行處理
  • 默認情況下,如果不指定,切片大小 = BlockSize的塊大小,這也是最優的處理
  • 切片時不考慮數據整體,而是針對每一個文件單獨切片

Hadoop默認切片機制

默認情況下,不做任何設置的話,hadoop將采用FileInputFormat切片機制,簡單來說,原理如下:

  • 簡單的按照文件內容長度進行切片
  • 切片大小,默認等於128MB,即blocksize的大小
  • 切片時不考慮數據整體,而是針對每一個文件單獨切片

這個相對來說,比較簡單,就不再過多贅述瞭,可以通過源碼調試,找到下面的writeNewSplits 方法,進去看看源碼的做法

Hadoop TextInputFormat 的優化切片機制

FileInputFormat實現類

在編寫job的main程序中,還記得最後設置讀取文件和輸出文件的兩行代碼

在運行MapReduce程序時,輸入的文件格式有很多種,比如:基於行的日志文件、二進制格式文件、數據庫表等。那麼,針對不同的數據類型,MapReduce是如何讀取這些數據的呢?

FileInputFormat常見的接口實現類包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定義InputFormat等。

最常見的就是TextInputFormat

  • TextInputFormat是默認的FileInputFormat實現類
  • 按行讀取每條記錄
  • 鍵是存儲該行在整個文件中的起始字節偏移量, LongWritable類型
  • 值是這行的內容,不包括任何行終止符(換行符和回車符),Text類型。

以下是一個示例,比如,一個分片包含瞭如下4條文本記錄

Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise

每條記錄可以表示為以下鍵/值對:

(0,Rich learning form)
(20,Intelligent learning engine)
(49,Learning more convenient)
(74,From the real demand for more close to the enterprise)

CombineTextInputFormat切片機制

從上面的分析來看,框架默認的TextInputFormat切片機制,是對任務按文件規劃切片,不管文件多小,都會作為一個單獨的切片,交給一個MapTask,假如有大量小文件,就會產生大量的MapTask,從而處理效率上並不高

於是就可以考慮另一種切片機制,即CombineTextInputFormat

CombineTextInputFormat應用場景

CombineTextInputFormat 用於小文件過多的場景,它可將多個小文件從邏輯上規劃到一個切片中,這樣,多個小文件就可以交給一個MapTask處理

比如上面幾個文件,最大的隻有不到7MB,最下的隻有不到2MB,那麼基於CombineTextInputFormat的切片機制,可以考慮使用這種切片來做,具體的設置在job任務的代碼中按照下面這樣做設置

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m 或者其他數據

註意:虛擬存儲切片最大值設置最好根據實際的小文件大小情況來設置具體的值,這個可以按照總體文件的大小,獲取一個中位數比較好

CombineTextInputFormat案例代碼演示

使用上面的4個文件作為輸入數據源,期望隻需要使用一個切片處理4個文件(默認情況下,4個文件將會啟動4個切片和4個MapTask,從控制臺日志中觀察)

使用CombineTextInputFormat的切片,大概如下面的實現過程

  • 不做任何處理,運行上面的的WordCount案例程序,觀察切片個數為4(控制臺日志)
  • 在Job的代碼中增加如下代碼,運行程序,並觀察運行的切片個數為1
// 如果不設置InputFormat,它默認用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);

//虛擬存儲切片最大值設置20m
CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);
  • 先設置為4MB,然後嘗試設置為20MB,觀察運行結果是否為1個切片,number of splits:1

通過控制臺的輸出結果,驗證瞭上面的目標猜想

到此這篇關於hadoop 切片機制分析與應用的文章就介紹到這瞭,更多相關hadoop 切片機制內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: