Java基礎之MapReduce框架總結與擴展知識點

一、MapTask工作機制

MapTask就是Map階段的job,它的數量由切片決定

在這裡插入圖片描述

二、MapTask工作流程:

1.Read階段:讀取文件,此時進行對文件數據進行切片(InputFormat進行切片),通過切片,從而確定MapTask的數量,切片中包含數據和key(偏移量)

2.Map階段:這個階段是針對數據進行map方法的計算操作,通過該方法,可以對切片中的key和value進行處理

3.Collect收集階段:在用戶編寫map()函數中,當數據處理完成後,一般會調用OutputCollector.collect()輸出結果。在該函數內部,它會將生成的key/value分區(調用Partitioner),並寫入一個環形內存緩沖區中。

4.Spill階段:即“溢寫”,當環形緩沖區滿後,MapReduce會將數據寫到本地磁盤上,生成一個臨時文件。需要註意的是,將數據寫入本地磁盤之前,先要對數據進行一次本地排序,並在必要時對數據進行合並、壓縮等操作。

5.Combine階段:當所有數據處理完成後,MapTask對所有臨時文件進行一次合並,以確保最終隻會生成一個數據文件,這個階段默認是沒有的,一般需要我們自定義

6.當所有數據處理完後,MapTask會將所有臨時文件合並成一個大文件,並保存到文件output/file.out中,同時生成相應的索引文件output/file.out.index。

7.在進行文件合並過程中,MapTask以分區為單位進行合並。對於某個分區,它將采用多輪遞歸合並的方式。每輪合並io.sort.factor(默認10)個文件,並將產生的文件重新加入待合並列表中,對文件排序後,重復以上過程,直到最終得到一個大文件。

8.讓每個MapTask最終隻生成一個數據文件,可避免同時打開大量文件和同時讀取大量小文件產生的隨機讀取帶來的開銷

第四步溢寫階段詳情:

  • 步驟1:利用快速排序算法對緩存區內的數據進行排序,排序方式是,先按照分區編號Partition進行排序,然後按照key進行排序。這樣,經過排序後,數據以分區為單位聚集在一起,且同一分區內所有數據按照key有序。
  • 步驟2:按照分區編號由小到大依次將每個分區中的數據寫入任務工作目錄下的臨時文件output/spillN.out(N表示當前溢寫次數)中。如果用戶設置瞭Combiner,則寫入文件之前,對每個分區中的數據進行一次聚集操作。
  • 步驟3:將分區數據的元信息寫到內存索引數據結構SpillRecord中,其中每個分區的元信息包括在臨時文件中的偏移量、壓縮前數據大小和壓縮後數據大小。如果當前內存索引大小超過1MB,則將內存索引寫到文件output/spillN.out.index中。

三、ReduceTask工作機制

ReduceTask就是Reduce階段的job,它的數量由Map階段的分區進行決定

在這裡插入圖片描述

四、ReduceTask工作流程:

1.Copy階段:ReduceTask從各個MapTask上遠程拷貝一片數據,並針對某一片數據,如果其大小超過一定閾值,則寫到磁盤上,否則直接放到內存中。

2.Merge階段:在遠程拷貝數據的同時,ReduceTask啟動瞭兩個後臺線程對內存和磁盤上的文件進行合並,以防止內存使用過多或磁盤上文件過多。

3.Sort階段:按照MapReduce語義,用戶編寫reduce()函數輸入數據是按key進行聚集的一組數據。為瞭將key相同的數據聚在一起,Hadoop采用瞭基於排序的策略。由於各個MapTask已經實現對自己的處理結果進行瞭局部排序,因此,ReduceTask隻需對所有數據進行一次歸並排序即可。

4.Reduce階段:reduce()函數將計算結果寫到HDFS上

五、數據清洗(ETL)

我們在大數據開篇概述中說過,數據是低價值的,所以我們要從海量數據中獲取到我們想要的數據,首先就需要對數據進行清洗,這個過程也稱之為ETL

還記得上一章中的Join案例麼,我們對pname字段的填充,也算數據清洗的一種,下面我通過一個簡單的案例來演示一下數據清洗

數據清洗案例

需求:過濾一下log日志中字段個數小於11的日志(隨便舉個栗子而已)

測試數據:就拿我們這兩天學習中HadoopNodeName產生的日志來當測試數據吧,我將log日志信息放到我的windows中,數據位置如下

/opt/module/hadoop-3.1.3/logs/hadoop-xxx-nodemanager-hadoop102.log

編寫思路:

直接通過切片,然後判斷長度即可,因為是舉個栗子,沒有那麼復雜

真正的數據清洗會使用框架來做,這個我後面會為大傢帶來相關的知識

  • ETLDriver
package com.company.etl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class ETLDriver {
    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(ETLDriver.class);

        job.setMapperClass(ETLMapper.class);

        job.setNumReduceTasks(0);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);


        FileInputFormat.setInputPaths(job,new Path("D:\\io\\input8"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\io\\output88"));

        job.waitForCompletion(true);
    }
}

  • ETLMapper
package com.company.etl;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ETLMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //清洗(過濾)
        String line = value.toString();
        String[] info = line.split(" ");
        //判斷
        if (info.length > 11){
            context.write(value,NullWritable.get());
        }
    }
}

六、計數器應用

  • 顧名思義,計數器的作用就是用於計數的,在Hadoop中,它內部也有一個計數器,用於監控統計我們處理數據的數量
  • 我們通常在MapReduce中通過上下文 context進行應用,例如在Mapper中,我通過step方法進行初始化計數器,然後在我們map方法中進行計數

七、計數器案例

在上面數據清洗的基礎上進行計數器的使用,Driver沒什麼變化,隻有Mapper

我們在Mapper的setup方法中,創建計數器的對象,然後在map方法中調用它即可

ETLMapper

package com.company.etl;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ETLMapper extends Mapper<LongWritable, Text,Text, NullWritable> {

    private Counter sucess;
    private Counter fail;
    /*
        創建計數器對象
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        /*
             getCounter(String groupName, String counterName);
             第一個參數 :組名 隨便寫
             第二個參數 :計數器名 隨便寫
         */
        sucess = context.getCounter("ETL", "success");
        fail = context.getCounter("ETL", "fail");

    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //清洗(過濾)
        String line = value.toString();
        String[] info = line.split(" ");
        //判斷
        if (info.length > 11){
            context.write(value,NullWritable.get());
            //統計
            sucess.increment(1);
        }else{
            fail.increment(1);
        }

    }
}

八、MapReduce總結

好瞭,到這裡,我們MapReduce就全部學習完畢瞭,接下來,我再把整個內容串一下,還是MapReduce的那個圖

在這裡插入圖片描述

MapReduce的主要工作就是對數據進行運算、分析,它的工作流程如下:

1.我們會將HDFS中的數據通過InputFormat進行進行讀取、切片,從而計算出MapTask的數量

2.每一個MapTask中都會有Mapper類,裡面的map方法就是任務的具體實現,我們通過它,可以完成數據的key,value封裝,然後通過分區進入shuffle中來完成每個MapTask中的數據分區排序

3.通過分區來決定ReduceTask的數量,每一個ReduceTask都有一個Reducer類,裡面的reduce方法是ReduceTask的具體實現,它主要是完成最後的數據合並工作

4.當Reduce任務過重,我們可以通過Combiner合並,在Mapper階段來進行局部的數據合並,減輕Reduce的任務量,當然,前提是Combiner所做的局部合並工作不會影響最終的結果

5.當Reducer的任務完成,會將最終的key,value寫出,交給OutputFormat,用於數據的寫出,通過OutputFormat來完成HDFS的寫入操作

每一個MapTask和ReduceTask內部都是循環進行讀取,並且它有三個方法:setup() map()/reduce() cleanup()
setup()方法是在MapTask/ReduceTask剛剛啟動時進行調用,cleanup()是在任務完成後調用

到此這篇關於Java基礎之MapReduce框架總結與擴展知識點的文章就介紹到這瞭,更多相關Java MapReduce框架內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: